Skip to content

Commit

Permalink
72 manager evaluator add tracking of failure rates (#101)
Browse files Browse the repository at this point in the history
* Added tracking of error codes and rates

* api and web showing real error rate data
  • Loading branch information
RawthiL authored Sep 26, 2024
1 parent e385c86 commit e01dd0f
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 59 deletions.
3 changes: 2 additions & 1 deletion apps/go/manager/activities/process_node_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (aCtx *Ctx) AnalyzeResult(ctx context.Context, params types.AnalyzeResultPa
//------------------------------------------------------------------
taskData, err := retrieveTaskData(params.TaskID, aCtx.App.Mongodb, l)
if err != nil {
err = temporal.NewNonRetryableApplicationError("unable to get task data", "retrieveTaskData", fmt.Errorf("Task %s not found", params.TaskID.String()))
return nil, err
}
// Extract data
Expand Down Expand Up @@ -192,7 +193,7 @@ func retrieveTaskData(taskID primitive.ObjectID,
cursor := tasksCollection.FindOne(ctxM, task_request_filter, opts)
var taskReq types.TaskRequestRecord
if err := cursor.Decode(&taskReq); err != nil {
l.Error().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.")
l.Info().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.")
return taskReq, err
}

Expand Down
37 changes: 33 additions & 4 deletions apps/go/manager/records/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ func (record *BaseResultRecord) GetResultHeight() int64 {
return record.ResultHeight
}

type RelayResponseCodesEnum struct {
Ok int
Relay int
Node int
OutOfSession int
BadParams int
PromptNotFound int
DatabaseRead int
PocketRpc int
SignerNotFound int
SignerError int
AATSignature int
Evaluation int
}

var RelayResponseCodes = RelayResponseCodesEnum{
Ok: 0,
Relay: 1,
Node: 2,
OutOfSession: 3,
BadParams: 4,
PromptNotFound: 5,
DatabaseRead: 6,
PocketRpc: 7,
SignerNotFound: 8,
SignerError: 9,
AATSignature: 10,
Evaluation: 11,
}

// ------------------------------------------------------------------------------
// ResultInterface all results structs will respond to this, for ease of processing
// ------------------------------------------------------------------------------
Expand All @@ -55,9 +85,8 @@ type ResultInterface interface {
// Record written by the evaluator.
// The NumericalResultRecord field indicates how many samples were actually calculated
type NumericalResultRecord struct {
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []ScoresSample `bson:"scores"`
ComputeTimesSamples []float32 `bson:"times"`
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []ScoresSample `bson:"scores"`
}

func (record *NumericalResultRecord) GetResultTime() time.Time {
Expand Down Expand Up @@ -114,7 +143,7 @@ func (record *NumericalResultRecord) FindAndLoadResults(taskID primitive.ObjectI
//------------------------------------------------------------------------------

// Record written by the evaluator.
// The NumericalResultRecord field indicates how many samples were actually calculated
// The SignatureResultRecord field indicates how many samples were actually calculated
type SignatureResultRecord struct {
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []SignatureSample `bson:"signatures"`
Expand Down
95 changes: 76 additions & 19 deletions apps/go/manager/records/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co
err := fmt.Errorf("cannot find default (or specific) value for task type")
return false, err
}
if len(taskDep) == 0 {
l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency array for task type")
err := fmt.Errorf("malformed dependency array for task type")
return false, err
}
}

// Check dependency
Expand Down Expand Up @@ -216,6 +221,10 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co
if thisTaskRecord.IsOK() {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy OK")
continue
} else {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy NOT OK")
depOK = false
break
}
} else {
l.Error().Str("framework", framework).Str("task", task).Msg("dependency configuration cannot be processed (status type unknown)")
Expand Down Expand Up @@ -363,16 +372,20 @@ type NumericalTaskRecord struct {
MeanProcessTime float32 `bson:"mean_times"`
MedianProcessTime float32 `bson:"median_times"`
StdProcessTime float32 `bson:"std_times"`
// Errors
ErrorRate float32 `bson:"error_rate"`
ErrorCodes map[int]int `bson:"error_codes"`
// buffer
ScoresSamples []ScoresSample `bson:"scores"`
// circular buffer control
CircBuffer types.CircularBuffer `bson:"circ_buffer_control"`
}

type ScoresSample struct {
Score float64 `bson:"score"`
ID int `bson:"id"`
RunTime float32 `bson:"run_time"`
Score float64 `bson:"score"`
ID int `bson:"id"`
RunTime float32 `bson:"run_time"`
StatusCode int `bson:"status_code"`
}

func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) {
Expand All @@ -394,6 +407,8 @@ func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework
record.MeanProcessTime = 0.0
record.MedianProcessTime = 0.0
record.StdProcessTime = 0.0
record.ErrorRate = 0.0
record.ErrorCodes = make(map[int]int, 0)
record.ScoresSamples = make([]ScoresSample, bufferLen)

record.CircBuffer = types.CircularBuffer{
Expand Down Expand Up @@ -532,13 +547,29 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) {
// Slice the buffer and cast
var auxDataScores []float64
var auxDataTimes []float64
totalPunibleErrors := 0
punibleErrorsCodes := make(map[int]int)
for _, sampleId := range validIdx {
// Add sample to data array
auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score))
auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime))
sampleStatus := record.ScoresSamples[sampleId].StatusCode
if sampleStatus == 0 {
// Add sample to data array
auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score))
auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime))
} else if sampleStatus == RelayResponseCodes.Node || sampleStatus == RelayResponseCodes.Evaluation {
// This is a Node or Evaluation (response) error, we should punish the node
totalPunibleErrors += 1
punibleErrorsCodes[sampleStatus] += 1
}
}

// Total valid samples
length := len(auxDataScores)

// Set errors
record.ErrorCodes = punibleErrorsCodes
record.ErrorRate = float32(totalPunibleErrors) / float32(length+totalPunibleErrors)

// Calculate the scores and times
if length == 0 {
record.MeanScore = 0
record.StdScore = 0
Expand Down Expand Up @@ -598,11 +629,18 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter

// Increment the end
err = record.StepIndex(1, "end", true, l)
// Save sample
record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score
record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample
// Save sample if it is OK or it is an error imputable to the node
// the rest are ignored on purpose to avoid polluting the buffer with information
// that is not important to the servicer node. To debug other errors, check the logs...
if dataOk.StatusCode == RelayResponseCodes.Ok ||
dataOk.StatusCode == RelayResponseCodes.Node ||
dataOk.StatusCode == RelayResponseCodes.Evaluation {
record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score
record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime
record.ScoresSamples[record.CircBuffer.Indexes.End].StatusCode = dataOk.StatusCode
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample
}

return nil
}
Expand Down Expand Up @@ -635,15 +673,18 @@ type SignatureTaskRecord struct {
TaskData BaseTaskRecord `bson:"task_data"`
// Specific fields
LastSignature string `bson:"last_signature"`
// Errors
ErrorCode int `bson:"error_code"`
// buffers
Signatures []SignatureSample `bson:"signatures"`
// circular buffer control
CircBuffer types.CircularBuffer `bson:"circ_buffer_control"`
}

type SignatureSample struct {
Signature string `bson:"signature"`
ID int `bson:"id"`
Signature string `bson:"signature"`
ID int `bson:"id"`
StatusCode int `bson:"status_code"`
}

func (record *SignatureTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) {
Expand All @@ -660,6 +701,7 @@ func (record *SignatureTaskRecord) NewTask(nodeID primitive.ObjectID, framework
record.TaskData.LastSeen = date

record.LastSignature = ""
record.ErrorCode = 0
record.Signatures = make([]SignatureSample, bufferLen)
record.CircBuffer = types.CircularBuffer{
CircBufferLen: bufferLen,
Expand Down Expand Up @@ -796,17 +838,24 @@ func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data inter

// Increment the end
err = record.StepIndex(1, "end", true, l)
// Save sample
record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature
record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample
// Save sample if it is OK or it is an error imputable to the node
if dataOk.StatusCode == RelayResponseCodes.Ok ||
dataOk.StatusCode == RelayResponseCodes.Node ||
dataOk.StatusCode == RelayResponseCodes.Evaluation {

record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature
record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.Signatures[record.CircBuffer.Indexes.End].StatusCode = dataOk.StatusCode
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample

}

return nil
}

// Returns True if the task is ok, meaning that their values are updated and correct
func (record *SignatureTaskRecord) IsOK() bool {
if record.LastSignature != "" {
if record.LastSignature != "" && record.ErrorCode == 0 {
// there is a signature available, so it is OK
return true
} else {
Expand All @@ -817,7 +866,15 @@ func (record *SignatureTaskRecord) IsOK() bool {
// Process the buffer data to produce the signature metrics
func (record *SignatureTaskRecord) ProcessData(l *zerolog.Logger) (err error) {
// Just update the last signature
record.LastSignature = record.Signatures[record.CircBuffer.Indexes.End].Signature
lastSampleStatus := record.Signatures[record.CircBuffer.Indexes.End].StatusCode
if lastSampleStatus == 0 {
record.LastSignature = record.Signatures[record.CircBuffer.Indexes.End].Signature
record.ErrorCode = 0
} else {
record.LastSignature = ""
record.ErrorCode = lastSampleStatus
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions apps/python/api/app/basemodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"mean_times": "$mean_times",
"median_times": "$median_times",
"std_times": "$std_times",
"error_rate": "$error_rate",
}
},
]
Expand Down
33 changes: 30 additions & 3 deletions apps/python/api/app/leaderboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,27 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:

# Add Metrics
leaderboard_entry["metrics"] = dict()
leaderboard_entry["qos"] = dict()
running_mean_avg = 0
weight_avg = 0
std_err_avg = 0
incomplete = False
running_mean_time_avg = 0
std_err_time_avg = 0
total_requests = 0
total_errors = 0
for metric in LEADERBOARD_METRICS.keys():
metric_name = LEADERBOARD_METRICS[metric]
here_errors = 0
here_requests = 0

if metric == "mmlu":
running_mean_mmlu = 0
weight_mmlu = 0
std_err_mmlu = 0
running_mean_time_mmlu = 0
std_err_time_mmlu = 0

# This requiere more steps yay!
all_ok = True
partial = False
Expand All @@ -150,6 +156,12 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
all_ok = False
break
elif data_row["num"].values[0] > 0:
here_errors += (
data_row["num"].values[0]
* data_row["error_rate"].values[0]
)
here_requests += data_row["num"].values[0]

metric_mean = (
data_row["mean"].values[0] * data_row["num"].values[0]
)
Expand Down Expand Up @@ -217,6 +229,11 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
metric_time_std_err = np.nan

elif data_row["num"].values[0] > 0:
here_errors += (
data_row["num"].values[0] * data_row["error_rate"].values[0]
)
here_requests += data_row["num"].values[0]

metric_mean = data_row["mean"].values[0]
metric_std_err = data_row["std"].values[0] / np.sqrt(
data_row["num"].values[0]
Expand Down Expand Up @@ -256,6 +273,15 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
running_mean_time_avg += metric_time_mean * metric_weight
std_err_time_avg += metric_time_std_err**2

# Track QoS
total_errors += here_errors
total_requests += here_requests
err_rate_here = here_errors / here_requests if here_requests > 0 else 0
leaderboard_entry["qos"][metric_name] = {
"error_rate": err_rate_here,
"response_time": metric_time_mean,
}

if weight_avg == 0:
running_mean_avg = 0
running_mean_time_avg = 0
Expand All @@ -272,8 +298,9 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
}

# Add QoS
leaderboard_entry["qos"] = {
"error_rate": 0.0,
err_rate_here = total_errors / total_requests if total_requests > 0 else 0
leaderboard_entry["qos"] = { # TODO : Put all this into an "average" key
"error_rate": err_rate_here,
"response_time": running_mean_time_avg,
}

Expand All @@ -290,7 +317,7 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:

except Exception as e:
print(str(e))
logger.warn(
logger.warning(
f"Failed to retrieve leaderboard data for node : {entry} error: {str(e)}"
)

Expand Down
Loading

0 comments on commit e01dd0f

Please sign in to comment.