diff --git a/apps/go/manager/activities/process_node_result.go b/apps/go/manager/activities/process_node_result.go index 156f0c4..78c1915 100644 --- a/apps/go/manager/activities/process_node_result.go +++ b/apps/go/manager/activities/process_node_result.go @@ -36,6 +36,7 @@ func (aCtx *Ctx) AnalyzeResult(ctx context.Context, params types.AnalyzeResultPa //------------------------------------------------------------------ taskData, err := retrieveTaskData(params.TaskID, aCtx.App.Mongodb, l) if err != nil { + err = temporal.NewNonRetryableApplicationError("unable to get task data", "retrieveTaskData", fmt.Errorf("Task %s not found", params.TaskID.String())) return nil, err } // Extract data @@ -192,7 +193,7 @@ func retrieveTaskData(taskID primitive.ObjectID, cursor := tasksCollection.FindOne(ctxM, task_request_filter, opts) var taskReq types.TaskRequestRecord if err := cursor.Decode(&taskReq); err != nil { - l.Error().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.") + l.Info().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.") return taskReq, err } diff --git a/apps/go/manager/records/result.go b/apps/go/manager/records/result.go index 8ed40ce..84f4016 100644 --- a/apps/go/manager/records/result.go +++ b/apps/go/manager/records/result.go @@ -33,6 +33,36 @@ func (record *BaseResultRecord) GetResultHeight() int64 { return record.ResultHeight } +type RelayResponseCodesEnum struct { + Ok int + Relay int + Node int + OutOfSession int + BadParams int + PromptNotFound int + DatabaseRead int + PocketRpc int + SignerNotFound int + SignerError int + AATSignature int + Evaluation int +} + +var RelayResponseCodes = RelayResponseCodesEnum{ + Ok: 0, + Relay: 1, + Node: 2, + OutOfSession: 3, + BadParams: 4, + PromptNotFound: 5, + DatabaseRead: 6, + PocketRpc: 7, + SignerNotFound: 8, + SignerError: 9, + AATSignature: 10, + Evaluation: 11, +} + // ------------------------------------------------------------------------------ // ResultInterface all results structs will respond to this, for ease of processing // ------------------------------------------------------------------------------ @@ -55,9 +85,8 @@ type ResultInterface interface { // Record written by the evaluator. // The NumericalResultRecord field indicates how many samples were actually calculated type NumericalResultRecord struct { - ResultData BaseResultRecord `bson:"result_data"` - ScoresSamples []ScoresSample `bson:"scores"` - ComputeTimesSamples []float32 `bson:"times"` + ResultData BaseResultRecord `bson:"result_data"` + ScoresSamples []ScoresSample `bson:"scores"` } func (record *NumericalResultRecord) GetResultTime() time.Time { @@ -114,7 +143,7 @@ func (record *NumericalResultRecord) FindAndLoadResults(taskID primitive.ObjectI //------------------------------------------------------------------------------ // Record written by the evaluator. -// The NumericalResultRecord field indicates how many samples were actually calculated +// The SignatureResultRecord field indicates how many samples were actually calculated type SignatureResultRecord struct { ResultData BaseResultRecord `bson:"result_data"` ScoresSamples []SignatureSample `bson:"signatures"` diff --git a/apps/go/manager/records/task.go b/apps/go/manager/records/task.go index bf2bdc2..4ecb252 100644 --- a/apps/go/manager/records/task.go +++ b/apps/go/manager/records/task.go @@ -178,6 +178,11 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co err := fmt.Errorf("cannot find default (or specific) value for task type") return false, err } + if len(taskDep) == 0 { + l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency array for task type") + err := fmt.Errorf("malformed dependency array for task type") + return false, err + } } // Check dependency @@ -216,6 +221,10 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co if thisTaskRecord.IsOK() { l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy OK") continue + } else { + l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy NOT OK") + depOK = false + break } } else { l.Error().Str("framework", framework).Str("task", task).Msg("dependency configuration cannot be processed (status type unknown)") @@ -363,6 +372,9 @@ type NumericalTaskRecord struct { MeanProcessTime float32 `bson:"mean_times"` MedianProcessTime float32 `bson:"median_times"` StdProcessTime float32 `bson:"std_times"` + // Errors + ErrorRate float32 `bson:"error_rate"` + ErrorCodes map[int]int `bson:"error_codes"` // buffer ScoresSamples []ScoresSample `bson:"scores"` // circular buffer control @@ -370,9 +382,10 @@ type NumericalTaskRecord struct { } type ScoresSample struct { - Score float64 `bson:"score"` - ID int `bson:"id"` - RunTime float32 `bson:"run_time"` + Score float64 `bson:"score"` + ID int `bson:"id"` + RunTime float32 `bson:"run_time"` + StatusCode int `bson:"status_code"` } func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) { @@ -394,6 +407,8 @@ func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework record.MeanProcessTime = 0.0 record.MedianProcessTime = 0.0 record.StdProcessTime = 0.0 + record.ErrorRate = 0.0 + record.ErrorCodes = make(map[int]int, 0) record.ScoresSamples = make([]ScoresSample, bufferLen) record.CircBuffer = types.CircularBuffer{ @@ -532,13 +547,29 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) { // Slice the buffer and cast var auxDataScores []float64 var auxDataTimes []float64 + totalPunibleErrors := 0 + punibleErrorsCodes := make(map[int]int) for _, sampleId := range validIdx { - // Add sample to data array - auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score)) - auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime)) + sampleStatus := record.ScoresSamples[sampleId].StatusCode + if sampleStatus == 0 { + // Add sample to data array + auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score)) + auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime)) + } else if sampleStatus == RelayResponseCodes.Node || sampleStatus == RelayResponseCodes.Evaluation { + // This is a Node or Evaluation (response) error, we should punish the node + totalPunibleErrors += 1 + punibleErrorsCodes[sampleStatus] += 1 + } } + // Total valid samples length := len(auxDataScores) + + // Set errors + record.ErrorCodes = punibleErrorsCodes + record.ErrorRate = float32(totalPunibleErrors) / float32(length+totalPunibleErrors) + + // Calculate the scores and times if length == 0 { record.MeanScore = 0 record.StdScore = 0 @@ -598,11 +629,18 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter // Increment the end err = record.StepIndex(1, "end", true, l) - // Save sample - record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score - record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID - record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime - record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample + // Save sample if it is OK or it is an error imputable to the node + // the rest are ignored on purpose to avoid polluting the buffer with information + // that is not important to the servicer node. To debug other errors, check the logs... + if dataOk.StatusCode == RelayResponseCodes.Ok || + dataOk.StatusCode == RelayResponseCodes.Node || + dataOk.StatusCode == RelayResponseCodes.Evaluation { + record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score + record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID + record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime + record.ScoresSamples[record.CircBuffer.Indexes.End].StatusCode = dataOk.StatusCode + record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample + } return nil } @@ -635,6 +673,8 @@ type SignatureTaskRecord struct { TaskData BaseTaskRecord `bson:"task_data"` // Specific fields LastSignature string `bson:"last_signature"` + // Errors + ErrorCode int `bson:"error_code"` // buffers Signatures []SignatureSample `bson:"signatures"` // circular buffer control @@ -642,8 +682,9 @@ type SignatureTaskRecord struct { } type SignatureSample struct { - Signature string `bson:"signature"` - ID int `bson:"id"` + Signature string `bson:"signature"` + ID int `bson:"id"` + StatusCode int `bson:"status_code"` } func (record *SignatureTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) { @@ -660,6 +701,7 @@ func (record *SignatureTaskRecord) NewTask(nodeID primitive.ObjectID, framework record.TaskData.LastSeen = date record.LastSignature = "" + record.ErrorCode = 0 record.Signatures = make([]SignatureSample, bufferLen) record.CircBuffer = types.CircularBuffer{ CircBufferLen: bufferLen, @@ -796,17 +838,24 @@ func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data inter // Increment the end err = record.StepIndex(1, "end", true, l) - // Save sample - record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature - record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID - record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample + // Save sample if it is OK or it is an error imputable to the node + if dataOk.StatusCode == RelayResponseCodes.Ok || + dataOk.StatusCode == RelayResponseCodes.Node || + dataOk.StatusCode == RelayResponseCodes.Evaluation { + + record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature + record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID + record.Signatures[record.CircBuffer.Indexes.End].StatusCode = dataOk.StatusCode + record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample + + } return nil } // Returns True if the task is ok, meaning that their values are updated and correct func (record *SignatureTaskRecord) IsOK() bool { - if record.LastSignature != "" { + if record.LastSignature != "" && record.ErrorCode == 0 { // there is a signature available, so it is OK return true } else { @@ -817,7 +866,15 @@ func (record *SignatureTaskRecord) IsOK() bool { // Process the buffer data to produce the signature metrics func (record *SignatureTaskRecord) ProcessData(l *zerolog.Logger) (err error) { // Just update the last signature - record.LastSignature = record.Signatures[record.CircBuffer.Indexes.End].Signature + lastSampleStatus := record.Signatures[record.CircBuffer.Indexes.End].StatusCode + if lastSampleStatus == 0 { + record.LastSignature = record.Signatures[record.CircBuffer.Indexes.End].Signature + record.ErrorCode = 0 + } else { + record.LastSignature = "" + record.ErrorCode = lastSampleStatus + } + return nil } diff --git a/apps/python/api/app/basemodels.py b/apps/python/api/app/basemodels.py index a2b2793..4a80cd8 100644 --- a/apps/python/api/app/basemodels.py +++ b/apps/python/api/app/basemodels.py @@ -37,6 +37,7 @@ "mean_times": "$mean_times", "median_times": "$median_times", "std_times": "$std_times", + "error_rate": "$error_rate", } }, ] diff --git a/apps/python/api/app/leaderboard.py b/apps/python/api/app/leaderboard.py index 871b2b8..56ff285 100644 --- a/apps/python/api/app/leaderboard.py +++ b/apps/python/api/app/leaderboard.py @@ -120,14 +120,19 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: # Add Metrics leaderboard_entry["metrics"] = dict() + leaderboard_entry["qos"] = dict() running_mean_avg = 0 weight_avg = 0 std_err_avg = 0 incomplete = False running_mean_time_avg = 0 std_err_time_avg = 0 + total_requests = 0 + total_errors = 0 for metric in LEADERBOARD_METRICS.keys(): metric_name = LEADERBOARD_METRICS[metric] + here_errors = 0 + here_requests = 0 if metric == "mmlu": running_mean_mmlu = 0 @@ -135,6 +140,7 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: std_err_mmlu = 0 running_mean_time_mmlu = 0 std_err_time_mmlu = 0 + # This requiere more steps yay! all_ok = True partial = False @@ -150,6 +156,12 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: all_ok = False break elif data_row["num"].values[0] > 0: + here_errors += ( + data_row["num"].values[0] + * data_row["error_rate"].values[0] + ) + here_requests += data_row["num"].values[0] + metric_mean = ( data_row["mean"].values[0] * data_row["num"].values[0] ) @@ -217,6 +229,11 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: metric_time_std_err = np.nan elif data_row["num"].values[0] > 0: + here_errors += ( + data_row["num"].values[0] * data_row["error_rate"].values[0] + ) + here_requests += data_row["num"].values[0] + metric_mean = data_row["mean"].values[0] metric_std_err = data_row["std"].values[0] / np.sqrt( data_row["num"].values[0] @@ -256,6 +273,15 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: running_mean_time_avg += metric_time_mean * metric_weight std_err_time_avg += metric_time_std_err**2 + # Track QoS + total_errors += here_errors + total_requests += here_requests + err_rate_here = here_errors / here_requests if here_requests > 0 else 0 + leaderboard_entry["qos"][metric_name] = { + "error_rate": err_rate_here, + "response_time": metric_time_mean, + } + if weight_avg == 0: running_mean_avg = 0 running_mean_time_avg = 0 @@ -272,8 +298,9 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: } # Add QoS - leaderboard_entry["qos"] = { - "error_rate": 0.0, + err_rate_here = total_errors / total_requests if total_requests > 0 else 0 + leaderboard_entry["qos"] = { # TODO : Put all this into an "average" key + "error_rate": err_rate_here, "response_time": running_mean_time_avg, } @@ -290,7 +317,7 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: except Exception as e: print(str(e)) - logger.warn( + logger.warning( f"Failed to retrieve leaderboard data for node : {entry} error: {str(e)}" ) diff --git a/apps/python/evaluator/activities/signatures/model_config_evaluate.py b/apps/python/evaluator/activities/signatures/model_config_evaluate.py index 6699d25..b5b66b4 100644 --- a/apps/python/evaluator/activities/signatures/model_config_evaluate.py +++ b/apps/python/evaluator/activities/signatures/model_config_evaluate.py @@ -78,6 +78,18 @@ async def model_config_evaluate(args: PocketNetworkEvaluationTaskRequest) -> boo model_config_decoded = True except Exception as e: eval_logger.debug("Exeption:", Exeption=str(e)) + # Update the result with errored data + result.result_data.num_samples = 1 # Always one + result.result_data.status = ( + 0 # OK, proceed to add to signatures buffer (by manager) + ) + result.signatures = [ + SignatureSample( + signature="Cannot decode configuration", + id=0, + status_code=11, # Error at evaluation + ) # This task has a single sample id + ] model_config_ok = False if model_config_decoded: @@ -113,6 +125,18 @@ async def model_config_evaluate(args: PocketNetworkEvaluationTaskRequest) -> boo eval_logger.info("Cannot load the model config from response.") eval_logger.debug("Exeption:", Exeption=str(e)) model_config_ok = False + # Update the result with errored data + result.result_data.num_samples = 1 # Always one + result.result_data.status = ( + 0 # OK, proceed to add to signatures buffer (by manager) + ) + result.signatures = [ + SignatureSample( + signature="Cannot load model configuration", + id=0, + status_code=11, # Error at evaluation + ) # This task has a single sample id + ] model_config_new = False if model_config_ok: @@ -144,7 +168,7 @@ async def model_config_evaluate(args: PocketNetworkEvaluationTaskRequest) -> boo result.result_data.status = 0 # OK result.signatures = [ SignatureSample( - signature=str(model_config_mongo_new.hash), id=0 + signature=str(model_config_mongo_new.hash), id=0, status_code=0 ) # This task has a single sample id ] @@ -177,8 +201,38 @@ async def model_config_evaluate(args: PocketNetworkEvaluationTaskRequest) -> boo model_config_is_new=model_config_new, ) except Exception as e: - # TODO: enhance drop task logic - await mongo_operator.mark_task_to_drop(args.task_id) + # Create a failed result + result = PocketNetworkMongoDBResultSignature( + result_data=PocketNetworkMongoDBResultBase( + task_id=args.task_id, + status=11, # We failed to process + num_samples=0, + result_height=-1, + result_time=datetime.today().isoformat(), + ), + signatures=[], + ) + # Save to results db (a failure is also an answer) + try: + async with mongo_client.start_transaction() as session: + await mongo_client.db["results"].find_one_and_update( + {"result_data.task_id": args.task_id}, + {"$set": result.model_dump(by_alias=True)}, + upsert=True, + session=session, + ) + await mongo_client.db["tasks"].update_one( + {"_id": args.task_id}, + {"$set": {"evaluated": True}}, + session=session, + ) + eval_logger.debug("Saved result to DB.") + except Exception as e: + eval_logger.error("Failed to save Result to MongoDB.") + eval_logger.error("Exception:", Exeption=str(e)) + raise ApplicationError( + "Failed to save result to MongoDB.", non_retryable=True + ) raise e return True diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py index c95d849..9095711 100644 --- a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -78,6 +78,18 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: tokenizer_decoded = True except Exception as e: eval_logger.debug("Exeption:", Exeption=str(e)) + # Update the result with errored data + result.result_data.num_samples = 1 # Always one + result.result_data.status = ( + 0 # OK, proceed to add to signatures buffer (by manager) + ) + result.signatures = [ + SignatureSample( + signature="Cannot decode tokenizer", + id=0, + status_code=11, # Error at evaluation + ) # This task has a single sample id + ] tokenizer_ok = False if tokenizer_decoded: @@ -113,6 +125,18 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: eval_logger.info("Cannot load tokenizer from response.") eval_logger.debug("Exeption:", Exeption=str(e)) tokenizer_ok = False + # Update the result with errored data + result.result_data.num_samples = 1 # Always one + result.result_data.status = ( + 0 # OK, proceed to add to signatures buffer (by manager) + ) + result.signatures = [ + SignatureSample( + signature="Cannot load tokenizer from decoded data", + id=0, + status_code=11, # Error at evaluation + ) # This task has a single sample id + ] tokenizer_new = False if tokenizer_ok: @@ -144,7 +168,7 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: result.result_data.status = 0 # OK result.signatures = [ SignatureSample( - signature=str(tokenizer_mongo_new.hash), id=0 + signature=str(tokenizer_mongo_new.hash), id=0, status_code=0 ) # This task has a single sample id ] @@ -177,8 +201,38 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: tokenizer_is_new=tokenizer_new, ) except Exception as e: - # TODO: enhance drop task logic - await mongo_operator.mark_task_to_drop(args.task_id) + # Create a failed result + result = PocketNetworkMongoDBResultSignature( + result_data=PocketNetworkMongoDBResultBase( + task_id=args.task_id, + status=11, # We failed to process + num_samples=0, + result_height=-1, + result_time=datetime.today().isoformat(), + ), + signatures=[], + ) + # Save to results db (a failure is also an answer) + try: + async with mongo_client.start_transaction() as session: + await mongo_client.db["results"].find_one_and_update( + {"result_data.task_id": args.task_id}, + {"$set": result.model_dump(by_alias=True)}, + upsert=True, + session=session, + ) + await mongo_client.db["tasks"].update_one( + {"_id": args.task_id}, + {"$set": {"evaluated": True}}, + session=session, + ) + eval_logger.debug("Saved result to DB.") + except Exception as e: + eval_logger.error("Failed to save Result to MongoDB.") + eval_logger.error("Exception:", Exeption=str(e)) + raise ApplicationError( + "Failed to save result to MongoDB.", non_retryable=True + ) raise e return True diff --git a/packages/python/lmeh/pocket_lm_eval/api/task.py b/packages/python/lmeh/pocket_lm_eval/api/task.py index fc9678d..e93940b 100644 --- a/packages/python/lmeh/pocket_lm_eval/api/task.py +++ b/packages/python/lmeh/pocket_lm_eval/api/task.py @@ -985,9 +985,14 @@ async def build_all_requests( tokenizer_name: str = "", ) -> None: """Build a set of Instances for a task, and store them in task.instances""" - self._instances, kept_doc_ids, self.result_height = await MongoOperator( - client=mongo_client - ).reconstruct_instances(task_id=task_id, eval_logger=self.eval_logger) + ( + self._instances, + kept_doc_ids, + self.result_height, + self.failed_instances, + ) = await MongoOperator(client=mongo_client).reconstruct_instances( + task_id=task_id, eval_logger=self.eval_logger + ) # Kept only those docs_ids filled by all its instances/responses if kept_doc_ids: b_dict = {} diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index fd9d728..6847fcf 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -481,21 +481,49 @@ async def save_results( raise e if len(task.instances) == 0: - insert_mongo_results = [] - eval_logger.debug( - "No instances/doc_id generated for task.", task_id=str(task_id) - ) - base_result = PocketNetworkMongoDBResultBase( - task_id=task_id, - status=1, - num_samples=0, - result_height=task.result_height, - result_time=datetime.today().isoformat(), - ) - num_result = PocketNetworkMongoDBResultNumerical( - result_data=base_result, scores=[] - ) + if len(task.failed_instances) == 0: + # Nothing to do, not sure this state is reachable + insert_mongo_results = [] + eval_logger.debug( + "No instances/doc_id generated for task.", task_id=str(task_id) + ) + base_result = PocketNetworkMongoDBResultBase( + task_id=task_id, + status=1, + num_samples=0, + result_height=task.result_height, + result_time=datetime.today().isoformat(), + ) + num_result = PocketNetworkMongoDBResultNumerical( + result_data=base_result, scores=[] + ) + else: + # Just add all failed instances + scores = [] + for instance in task.failed_instances: + numericSample = NumericSample( + score=0.0, + run_time=0.0, + id=instance["id"], + status_code=instance["code"], + ) + scores.append(numericSample) + + base_result = PocketNetworkMongoDBResultBase( + task_id=task_id, + status=0, + num_samples=len(task.failed_instances), + result_height=task.result_height, + result_time=datetime.today().isoformat(), + ) + num_result = PocketNetworkMongoDBResultNumerical( + result_data=base_result, scores=scores + ) + + # Save to DB and return insert_mongo_results.append(num_result.model_dump(by_alias=True)) + eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results) + await save_results( mongo_client=mongo_client, insert_mongo_results=insert_mongo_results, @@ -544,7 +572,6 @@ async def save_results( instances.sort(key=lambda x: x.idx) # iterate over different filters used scores = [] - times = [] result_num_samples = set() for filter_key in task.instances[0].filtered_resps.keys(): if filter_key not in selected_filters: @@ -590,14 +617,26 @@ async def save_results( task_output.sample_metrics[(metric, filter_key)].append(value) if metric in selected_metrics: numericSample = NumericSample( - score=example[metric], run_time=ms, id=doc_id + score=example[metric], + run_time=ms, + id=doc_id, + status_code=0, ) scores.append(numericSample) + # If there are failed samples, add them here to the scores list + for instance in task.failed_instances: + numericSample = NumericSample( + score=0.0, + run_time=0.0, + id=instance["id"], + status_code=instance["code"], + ) + scores.append(numericSample) base_result = PocketNetworkMongoDBResultBase( task_id=task_id, status=0, - num_samples=len(result_num_samples), + num_samples=len(result_num_samples) + len(task.failed_instances), result_height=task.result_height, result_time=datetime.today().isoformat(), ) diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index 2f853b3..fa8b154 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -338,6 +338,7 @@ async def reconstruct_instances( valid_fields = {field.name for field in Instance.__dataclass_fields__.values()} instances = [] + failed_instances = [] remove_doc_ids = set() kept_doc_ids = set() list_result_height = [] @@ -346,6 +347,13 @@ async def reconstruct_instances( list_result_height.append(doc["response"]["session_height"]) if not doc["response"]["ok"]: remove_doc_ids.add(i["doc_id"]) + failed_instances.append( + { + "id": i["doc_id"], + "code": doc["response"]["error_code"], + "error": doc["response"]["error"], + } + ) continue else: try: @@ -354,11 +362,15 @@ async def reconstruct_instances( ms = int(doc["response"]["ms"]) except Exception as e: remove_doc_ids.add(i["doc_id"]) + error_str = "Bad JSON data format (response)" eval_logger.error( - "Bad JSON data format", - response=doc["response"]["response"], + error_str, + # response=doc["response"]["response"], # Spams log errpr=str(e), ) + failed_instances.append( + {"id": i["doc_id"], "code": 11, "error": error_str} + ) continue instance_dict = { key: value for key, value in i.items() if key in valid_fields @@ -373,11 +385,15 @@ async def reconstruct_instances( request_data = json.loads(instance.prompt.data) except Exception as e: remove_doc_ids.add(i["doc_id"]) + error_str = "Bad JSON data format (prompt)" eval_logger.error( - "Bad JSON data format", + error_str, prompt_data=instance.prompt.data, error=str(e), ) + failed_instances.append( + {"id": i["doc_id"], "code": 11, "error": error_str} + ) continue instance.prompt.data = CompletionRequest(**request_data) @@ -386,11 +402,15 @@ async def reconstruct_instances( instance.resp = CompletionResponse(**r) except Exception as e: remove_doc_ids.add(i["doc_id"]) + error_str = "Bad JSON CompletionResponse format" eval_logger.error( - "Bad JSON CompletionResponse format", + error_str, response=r, error=str(e), ) + failed_instances.append( + {"id": i["doc_id"], "code": 11, "error": error_str} + ) continue instances.append(instance) @@ -408,7 +428,7 @@ async def reconstruct_instances( instances = sorted(instances, key=lambda x: (x.doc_id, x.idx)) - return instances, sorted(list(kept_doc_ids)), result_height + return instances, sorted(list(kept_doc_ids)), result_height, failed_instances async def mark_task_to_drop(self, task_id: ObjectId): empty_result = PocketNetworkMongoDBResultNumerical( @@ -420,7 +440,6 @@ async def mark_task_to_drop(self, task_id: ObjectId): result_time=datetime.today().isoformat(), ), scores=[], - times=[], ).model_dump(by_alias=True) async with self.client.start_transaction() as session: diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 4f08f29..6b11a07 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -265,6 +265,7 @@ class Config: class SignatureSample(BaseModel): signature: str id: int + status_code: int class PocketNetworkMongoDBResultSignature(BaseModel): @@ -280,6 +281,7 @@ class NumericSample(BaseModel): score: float id: int run_time: float + status_code: int class PocketNetworkMongoDBResultNumerical(BaseModel):