Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

72 manager evaluator add tracking of failure rates #101

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/go/manager/activities/process_node_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (aCtx *Ctx) AnalyzeResult(ctx context.Context, params types.AnalyzeResultPa
//------------------------------------------------------------------
taskData, err := retrieveTaskData(params.TaskID, aCtx.App.Mongodb, l)
if err != nil {
err = temporal.NewNonRetryableApplicationError("unable to get task data", "retrieveTaskData", fmt.Errorf("Task %s not found", params.TaskID.String()))
return nil, err
}
// Extract data
Expand Down Expand Up @@ -192,7 +193,7 @@ func retrieveTaskData(taskID primitive.ObjectID,
cursor := tasksCollection.FindOne(ctxM, task_request_filter, opts)
var taskReq types.TaskRequestRecord
if err := cursor.Decode(&taskReq); err != nil {
l.Error().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.")
l.Info().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.")
return taskReq, err
}

Expand Down
37 changes: 33 additions & 4 deletions apps/go/manager/records/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ func (record *BaseResultRecord) GetResultHeight() int64 {
return record.ResultHeight
}

type RelayResponseCodesEnum struct {
Ok int
Relay int
Node int
OutOfSession int
BadParams int
PromptNotFound int
DatabaseRead int
PocketRpc int
SignerNotFound int
SignerError int
AATSignature int
Evaluation int
}

var RelayResponseCodes = RelayResponseCodesEnum{
Ok: 0,
Relay: 1,
Node: 2,
OutOfSession: 3,
BadParams: 4,
PromptNotFound: 5,
DatabaseRead: 6,
PocketRpc: 7,
SignerNotFound: 8,
SignerError: 9,
AATSignature: 10,
Evaluation: 11,
}

// ------------------------------------------------------------------------------
// ResultInterface all results structs will respond to this, for ease of processing
// ------------------------------------------------------------------------------
Expand All @@ -55,9 +85,8 @@ type ResultInterface interface {
// Record written by the evaluator.
// The NumericalResultRecord field indicates how many samples were actually calculated
type NumericalResultRecord struct {
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []ScoresSample `bson:"scores"`
ComputeTimesSamples []float32 `bson:"times"`
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []ScoresSample `bson:"scores"`
}

func (record *NumericalResultRecord) GetResultTime() time.Time {
Expand Down Expand Up @@ -114,7 +143,7 @@ func (record *NumericalResultRecord) FindAndLoadResults(taskID primitive.ObjectI
//------------------------------------------------------------------------------

// Record written by the evaluator.
// The NumericalResultRecord field indicates how many samples were actually calculated
// The SignatureResultRecord field indicates how many samples were actually calculated
type SignatureResultRecord struct {
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []SignatureSample `bson:"signatures"`
Expand Down
95 changes: 76 additions & 19 deletions apps/go/manager/records/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co
err := fmt.Errorf("cannot find default (or specific) value for task type")
return false, err
}
if len(taskDep) == 0 {
l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency array for task type")
err := fmt.Errorf("malformed dependency array for task type")
return false, err
}
}

// Check dependency
Expand Down Expand Up @@ -216,6 +221,10 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co
if thisTaskRecord.IsOK() {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy OK")
continue
} else {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy NOT OK")
depOK = false
break
}
} else {
l.Error().Str("framework", framework).Str("task", task).Msg("dependency configuration cannot be processed (status type unknown)")
Expand Down Expand Up @@ -363,16 +372,20 @@ type NumericalTaskRecord struct {
MeanProcessTime float32 `bson:"mean_times"`
MedianProcessTime float32 `bson:"median_times"`
StdProcessTime float32 `bson:"std_times"`
// Errors
ErrorRate float32 `bson:"error_rate"`
ErrorCodes map[int]int `bson:"error_codes"`
// buffer
ScoresSamples []ScoresSample `bson:"scores"`
// circular buffer control
CircBuffer types.CircularBuffer `bson:"circ_buffer_control"`
}

type ScoresSample struct {
Score float64 `bson:"score"`
ID int `bson:"id"`
RunTime float32 `bson:"run_time"`
Score float64 `bson:"score"`
ID int `bson:"id"`
RunTime float32 `bson:"run_time"`
StatusCode int `bson:"status_code"`
}

func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) {
Expand All @@ -394,6 +407,8 @@ func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework
record.MeanProcessTime = 0.0
record.MedianProcessTime = 0.0
record.StdProcessTime = 0.0
record.ErrorRate = 0.0
record.ErrorCodes = make(map[int]int, 0)
record.ScoresSamples = make([]ScoresSample, bufferLen)

record.CircBuffer = types.CircularBuffer{
Expand Down Expand Up @@ -532,13 +547,29 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) {
// Slice the buffer and cast
var auxDataScores []float64
var auxDataTimes []float64
totalPunibleErrors := 0
punibleErrorsCodes := make(map[int]int)
for _, sampleId := range validIdx {
// Add sample to data array
auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score))
auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime))
sampleStatus := record.ScoresSamples[sampleId].StatusCode
if sampleStatus == 0 {
// Add sample to data array
auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score))
auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime))
} else if sampleStatus == RelayResponseCodes.Node || sampleStatus == RelayResponseCodes.Evaluation {
// This is a Node or Evaluation (response) error, we should punish the node
totalPunibleErrors += 1
punibleErrorsCodes[sampleStatus] += 1
}
}

// Total valid samples
length := len(auxDataScores)

// Set errors
record.ErrorCodes = punibleErrorsCodes
record.ErrorRate = float32(totalPunibleErrors) / float32(length+totalPunibleErrors)

// Calculate the scores and times
if length == 0 {
record.MeanScore = 0
record.StdScore = 0
Expand Down Expand Up @@ -598,11 +629,18 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter

// Increment the end
err = record.StepIndex(1, "end", true, l)
// Save sample
record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score
record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample
// Save sample if it is OK or it is an error imputable to the node
// the rest are ignored on purpose to avoid polluting the buffer with information
// that is not important to the servicer node. To debug other errors, check the logs...
if dataOk.StatusCode == RelayResponseCodes.Ok ||
dataOk.StatusCode == RelayResponseCodes.Node ||
dataOk.StatusCode == RelayResponseCodes.Evaluation {
record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score
record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime
record.ScoresSamples[record.CircBuffer.Indexes.End].StatusCode = dataOk.StatusCode
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample
}

return nil
}
Expand Down Expand Up @@ -635,15 +673,18 @@ type SignatureTaskRecord struct {
TaskData BaseTaskRecord `bson:"task_data"`
// Specific fields
LastSignature string `bson:"last_signature"`
// Errors
ErrorCode int `bson:"error_code"`
// buffers
Signatures []SignatureSample `bson:"signatures"`
// circular buffer control
CircBuffer types.CircularBuffer `bson:"circ_buffer_control"`
}

type SignatureSample struct {
Signature string `bson:"signature"`
ID int `bson:"id"`
Signature string `bson:"signature"`
ID int `bson:"id"`
StatusCode int `bson:"status_code"`
}

func (record *SignatureTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) {
Expand All @@ -660,6 +701,7 @@ func (record *SignatureTaskRecord) NewTask(nodeID primitive.ObjectID, framework
record.TaskData.LastSeen = date

record.LastSignature = ""
record.ErrorCode = 0
record.Signatures = make([]SignatureSample, bufferLen)
record.CircBuffer = types.CircularBuffer{
CircBufferLen: bufferLen,
Expand Down Expand Up @@ -796,17 +838,24 @@ func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data inter

// Increment the end
err = record.StepIndex(1, "end", true, l)
// Save sample
record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature
record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample
// Save sample if it is OK or it is an error imputable to the node
if dataOk.StatusCode == RelayResponseCodes.Ok ||
dataOk.StatusCode == RelayResponseCodes.Node ||
dataOk.StatusCode == RelayResponseCodes.Evaluation {

record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature
record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.Signatures[record.CircBuffer.Indexes.End].StatusCode = dataOk.StatusCode
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample

}

return nil
}

// Returns True if the task is ok, meaning that their values are updated and correct
func (record *SignatureTaskRecord) IsOK() bool {
if record.LastSignature != "" {
if record.LastSignature != "" && record.ErrorCode == 0 {
// there is a signature available, so it is OK
return true
} else {
Expand All @@ -817,7 +866,15 @@ func (record *SignatureTaskRecord) IsOK() bool {
// Process the buffer data to produce the signature metrics
func (record *SignatureTaskRecord) ProcessData(l *zerolog.Logger) (err error) {
// Just update the last signature
record.LastSignature = record.Signatures[record.CircBuffer.Indexes.End].Signature
lastSampleStatus := record.Signatures[record.CircBuffer.Indexes.End].StatusCode
if lastSampleStatus == 0 {
record.LastSignature = record.Signatures[record.CircBuffer.Indexes.End].Signature
record.ErrorCode = 0
} else {
record.LastSignature = ""
record.ErrorCode = lastSampleStatus
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions apps/python/api/app/basemodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"mean_times": "$mean_times",
"median_times": "$median_times",
"std_times": "$std_times",
"error_rate": "$error_rate",
}
},
]
Expand Down
33 changes: 30 additions & 3 deletions apps/python/api/app/leaderboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,27 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:

# Add Metrics
leaderboard_entry["metrics"] = dict()
leaderboard_entry["qos"] = dict()
running_mean_avg = 0
weight_avg = 0
std_err_avg = 0
incomplete = False
running_mean_time_avg = 0
std_err_time_avg = 0
total_requests = 0
total_errors = 0
for metric in LEADERBOARD_METRICS.keys():
metric_name = LEADERBOARD_METRICS[metric]
here_errors = 0
here_requests = 0

if metric == "mmlu":
running_mean_mmlu = 0
weight_mmlu = 0
std_err_mmlu = 0
running_mean_time_mmlu = 0
std_err_time_mmlu = 0

# This requiere more steps yay!
all_ok = True
partial = False
Expand All @@ -150,6 +156,12 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
all_ok = False
break
elif data_row["num"].values[0] > 0:
here_errors += (
data_row["num"].values[0]
* data_row["error_rate"].values[0]
)
here_requests += data_row["num"].values[0]

metric_mean = (
data_row["mean"].values[0] * data_row["num"].values[0]
)
Expand Down Expand Up @@ -217,6 +229,11 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
metric_time_std_err = np.nan

elif data_row["num"].values[0] > 0:
here_errors += (
data_row["num"].values[0] * data_row["error_rate"].values[0]
)
here_requests += data_row["num"].values[0]

metric_mean = data_row["mean"].values[0]
metric_std_err = data_row["std"].values[0] / np.sqrt(
data_row["num"].values[0]
Expand Down Expand Up @@ -256,6 +273,15 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
running_mean_time_avg += metric_time_mean * metric_weight
std_err_time_avg += metric_time_std_err**2

# Track QoS
total_errors += here_errors
total_requests += here_requests
err_rate_here = here_errors / here_requests if here_requests > 0 else 0
leaderboard_entry["qos"][metric_name] = {
"error_rate": err_rate_here,
"response_time": metric_time_mean,
}

if weight_avg == 0:
running_mean_avg = 0
running_mean_time_avg = 0
Expand All @@ -272,8 +298,9 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
}

# Add QoS
leaderboard_entry["qos"] = {
"error_rate": 0.0,
err_rate_here = total_errors / total_requests if total_requests > 0 else 0
leaderboard_entry["qos"] = { # TODO : Put all this into an "average" key
"error_rate": err_rate_here,
"response_time": running_mean_time_avg,
}

Expand All @@ -290,7 +317,7 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:

except Exception as e:
print(str(e))
logger.warn(
logger.warning(
f"Failed to retrieve leaderboard data for node : {entry} error: {str(e)}"
)

Expand Down
Loading
Loading