From f07edf6c6acd8d9353e6e66bc627ec7d2b922503 Mon Sep 17 00:00:00 2001 From: Nicolas Aguirre Date: Thu, 13 Jun 2024 03:08:44 -0300 Subject: [PATCH 1/4] open llm configs + Mongo results --- .../evaluator/activities/lmeh/evaluate.py | 24 +-- apps/python/sampler/activities/lmeh/sample.py | 3 + packages/python/lmeh/utils/generator.py | 190 ++++++------------ 3 files changed, 64 insertions(+), 153 deletions(-) diff --git a/apps/python/evaluator/activities/lmeh/evaluate.py b/apps/python/evaluator/activities/lmeh/evaluate.py index bd3265f..c264b70 100644 --- a/apps/python/evaluator/activities/lmeh/evaluate.py +++ b/apps/python/evaluator/activities/lmeh/evaluate.py @@ -6,6 +6,7 @@ from packages.python.lmeh.pocket_lm_eval.models.pocket_network import EvaluatorLM from packages.python.lmeh.utils.common import get_task_manager from packages.python.lmeh.utils import generator as lmeh_generator +from packages.python.lmeh.utils import task_config as open_llm_config from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest from packages.python.lmeh.utils.mongodb import MongoOperator from packages.python.common.auto_heartbeater import auto_heartbeater @@ -117,6 +118,8 @@ async def lmeh_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: # generate configurable tasks try: + open_llm_cfg = open_llm_config.get_task_config(task_names[0]) + open_llm_metrics = open_llm_cfg["metric"] task_dict = lmeh_generator.get_configurable_task( tasks=[task_name], num_fewshot=args.num_fewshot, @@ -168,31 +171,14 @@ async def lmeh_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: eval_logger.debug("Generating LM") lm = EvaluatorLM(**args.llm_args) eval_logger.debug("LM generated successfully.") - results = await lmeh_generator.evaluate( + task_output = await lmeh_generator.evaluate( lm=lm, task_dict=task_dict, task_id=args.task_id, mongo_client=mongo_client, + selected_metrics=open_llm_metrics, eval_logger=eval_logger, - bootstrap_iters=args.bootstrap_iters, ) eval_logger.info("Evaluation completed successfully.") - if lm.rank == 0: - # add info about the model and few shot config - results["config"] = { - "model": args.requester_args.address, - "model_args": args.llm_args, - "bootstrap_iters": args.bootstrap_iters, - "gen_kwargs": args.gen_kwargs, - } - - # todo: resolve code below - # results["git_hash"] = get_git_commit_hash() - # results["date"] = start_date - # add_env_info(results) # additional environment info to results - - # assign evaluation's result to a general result under task name, because we iterate over all the tasks - r[task_name] = results - return True diff --git a/apps/python/sampler/activities/lmeh/sample.py b/apps/python/sampler/activities/lmeh/sample.py index 607f344..f47055d 100644 --- a/apps/python/sampler/activities/lmeh/sample.py +++ b/apps/python/sampler/activities/lmeh/sample.py @@ -5,6 +5,7 @@ from app.app import get_app_logger, get_app_config from packages.python.protocol.protocol import PocketNetworkTaskRequest from packages.python.lmeh.utils import generator as lmeh_generator +from packages.python.lmeh.utils import task_config as open_llm_config from packages.python.lmeh.pocket_lm_eval.models.pocket_network import PocketNetworkLM from activities.utils import auto_heartbeater from packages.python.lmeh.utils import sql as lmeh_sql @@ -57,6 +58,8 @@ async def lmeh_sample(args: PocketNetworkTaskRequest) -> bool: # generate configurable tasks try: + open_llm_cfg = open_llm_config.get_task_config(task_names[0]) + args.num_fewshot = open_llm_cfg["num_fewshot"] task_dict = lmeh_generator.get_configurable_task( tasks=[task_name], num_fewshot=args.num_fewshot, diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index 94dd39c..f754b5d 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -22,7 +22,7 @@ from packages.python.lmeh.utils.mongodb import MongoOperator from packages.python.lmeh.pocket_lm_eval.tasks import PocketNetworkTaskManager from packages.python.protocol.protocol import PocketNetworkTaskRequest, PocketNetworkMongoDBTask, \ - PocketNetworkMongoDBPrompt + PocketNetworkMongoDBPrompt, NumericSample, PocketNetworkMongoDBResultNumerical from motor.motor_asyncio import AsyncIOMotorClient from packages.python.common.mongodb import MongoClient from bson import ObjectId @@ -67,7 +67,8 @@ def get_configurable_task( tasks = [] if len(tasks) == 0: raise ApplicationError( - "No tasks specified, or no tasks found. Please verify the task names.", non_retryable=True + "No tasks specified, or no tasks found. Please verify the task names.", + non_retryable=True, ) if gen_kwargs is not None: @@ -197,16 +198,23 @@ async def generate_requests( for _, rs in requests.items(): for r in rs: task_name, instance_id = r.metadata[0], r.doc_id - if instance_id not in task_dict[task_name].config.metadata['pocket_args'].doc_ids: + if ( + instance_id + not in task_dict[task_name].config.metadata["pocket_args"].doc_ids + ): # noinspection PyArgumentList eval_logger.error( - f"Instance id not found in task.config.metadata[\"pocket_args\"].doc_ids", - instance_id=instance_id, task=task_name, - task_ids=task_dict[task_name].config.metadata['pocket_args'].doc_ids + 'Instance id not found in task.config.metadata["pocket_args"].doc_ids', + instance_id=instance_id, + task=task_name, + task_ids=task_dict[task_name] + .config.metadata["pocket_args"] + .doc_ids, ) raise ApplicationError( f"Request id {instance_id} not found in task.config.metadata", - instance_id, task_name, + instance_id, + task_name, type="InstanceNotFound", non_retryable=True, ) @@ -245,24 +253,23 @@ async def generate_requests( task_mongodb = PocketNetworkMongoDBTask( **{ **args.model_dump(), - **{ - "total_instances": len(instances), - "request_type": task.OUTPUT_TYPE - }, + **{"total_instances": len(instances), "request_type": task.OUTPUT_TYPE}, }, ) insert_mongo_tasks.append(task_mongodb.model_dump(by_alias=True)) # Instances for instance in instances: - instance_mongo = MongoOperator.instance_to_dict(instance=instance, task_id=task_mongodb.id) + instance_mongo = MongoOperator.instance_to_dict( + instance=instance, task_id=task_mongodb.id + ) insert_mongo_instances.append(instance_mongo) # noinspection PyArgumentList # Prompts for pocket_req in instance.resps: - instance_id = instance_mongo['_id'] + instance_id = instance_mongo["_id"] data = pocket_req.model_dump_json( exclude_defaults=True, - exclude={"ctxlen", "context_enc", "continuation_enc"} + exclude={"ctxlen", "context_enc", "continuation_enc"}, ) prompt_mongo = PocketNetworkMongoDBPrompt( data=data, @@ -270,7 +277,8 @@ async def generate_requests( instance_id=instance_id, ctxlen=pocket_req.ctxlen, context_enc=pocket_req.context_enc, - continuation_enc=pocket_req.continuation_enc) + continuation_enc=pocket_req.continuation_enc, + ) insert_mongo_prompts.append(prompt_mongo.model_dump(by_alias=True)) try: async with mongo_client.start_transaction() as session: @@ -307,17 +315,16 @@ async def generate_requests( async def evaluate( - lm: "LM", - task_dict, - task_id: ObjectId, - mongo_client: MongoClient, - collection: str = 'tasks', - limit: Optional[int] = None, - bootstrap_iters: int = 100000, - cache_requests: bool = False, - rewrite_requests_cache: bool = False, - log_samples: bool = True, - eval_logger: Optional[logging.Logger] = None, + lm: "LM", + task_dict, + task_id: ObjectId, + mongo_client: MongoClient, + selected_metrics: str, + limit: Optional[int] = None, + cache_requests: bool = False, + rewrite_requests_cache: bool = False, + log_samples: bool = True, + eval_logger: Optional[logging.Logger] = None, ): """ :param lm: LM @@ -341,8 +348,8 @@ async def evaluate( task_hierarchy, eval_tasks = get_task_list(task_dict) if not log_samples: if not all( - "bypass" not in getattr(task_output.task, "_metric_fn_list", {}).keys() - for task_output in eval_tasks + "bypass" not in getattr(task_output.task, "_metric_fn_list", {}).keys() + for task_output in eval_tasks ): raise ValueError("log_samples must be True for 'bypass' metric-only tasks") @@ -352,7 +359,6 @@ async def evaluate( await task.build_all_requests( task_id=task_id, mongo_client=mongo_client, - collection=collection, limit=limit, rank=lm.rank, world_size=lm.world_size, @@ -379,21 +385,21 @@ async def evaluate( # run requests through model resps = getattr(lm, reqtype)(cloned_reqs) - eval_logger.debug(f"Response:", resps=resps) + eval_logger.debug("Response:", resps=resps) # put responses from model into a list of length K for each request. for x, req in zip(resps, cloned_reqs): req.resps.append(x) - eval_logger.debug(f"Request:", req=req, resps=req.resps, x=x) + eval_logger.debug("Request:", req=req, resps=req.resps, x=x) RANK = lm.rank WORLD_SIZE = lm.world_size + mongo_result = [] ### Postprocess outputs ### # TODO: del model here, maybe (idea: allow user to specify device of e.g. reward model separately) for task_output in eval_tasks: task = task_output.task task.apply_filters() - eval_logger.debug(f"Filtered Instance:", task=task.instances[0]) ### Collect values of metrics on all datapoints ### # # unpack results and sort back in order and return control to Task # TODO: make it possible to use a different metric per filter @@ -403,23 +409,24 @@ async def evaluate( instances_by_doc_id[instance.doc_id].append(instance) list_doc_id = list(instances_by_doc_id.keys()) # Sort instances within each group - eval_logger.debug(f"Instance by doc id:", instances_by_doc_id=instances_by_doc_id) for instances in instances_by_doc_id.values(): instances.sort(key=lambda x: x.idx) # iterate over different filters used + scores = [] + result_task_id = set() + result_num_samples = set() for filter_key in task.instances[0].filtered_resps.keys(): doc_iterator = task.doc_iterator( rank=RANK, limit=limit, world_size=WORLD_SIZE ) for i, doc in doc_iterator: doc_id = list_doc_id[i] - eval_logger.debug(f"Processing doc_id: {doc_id}") - eval_logger.debug(f"Doc:", doc=doc) + result_num_samples.add(doc_id) requests = instances_by_doc_id[doc_id] - eval_logger.debug(f"Requests (doc_iterator):", requests=requests) metrics = task.process_results( doc, [req.filtered_resps[filter_key] for req in requests] ) + result_task_id.add(requests[0].prompt.task_id) if log_samples: target = task.doc_to_target(doc) example = { @@ -436,101 +443,16 @@ async def evaluate( task_output.logged_samples.append(example) for metric, value in metrics.items(): task_output.sample_metrics[(metric, filter_key)].append(value) - - if RANK == 0: - ### Aggregate results over all datapoints ### - # aggregate results ; run bootstrap CIs - for task_output in eval_tasks: - task_output.calculate_aggregate_metric(bootstrap_iters=bootstrap_iters) - results, samples, configs, versions, num_fewshot = consolidate_results( - eval_tasks - ) - - ### Calculate group metrics ### - if bool(results): - for group, task_list in reversed(task_hierarchy.items()): - if len(task_list) == 0: - # task_hierarchy entries are either - # `group_name: [subtask1, subtask2, ...]` - # or `task_name: []`. - # we only want to operate on groups here. - continue - metric_list = list( - { - key - for task in task_list - for key in results[task].keys() - if "_stderr" not in key and key not in ["alias", "samples"] - } - ) - for metric in metric_list: - stderr = "_stderr,".join(metric.split(",")) - - # gather metrics, sizes, and stderrs from subtasks - metrics = [ - results[task][metric] - for task in task_list - if metric in results[task] - ] # TODO: copy? - stderrs = [ - results[task][stderr] - for task in task_list - if stderr in results[task] - ] - sizes = [ - results[task]["samples"] - for task in task_list - if metric in results[task] - ] - - # compute group's pooled metric and stderr - results[group][ - metric - ] = aggregate_subtask_metrics(metrics, sizes) - # TODO: calculate grouped metric using aggregation fn - if "N/A" in stderrs: - results[group][stderr] = "N/A" - else: - results[group][ - stderr - ] = pooled_sample_stderr(stderrs, sizes) - # TODO: allow GroupConfigs to choose which variance formula is used, for back-compatibility - # To use the old (likely incorrect) variance formula, comment out the above and uncomment this line: - # results[group][stderr] = lm_eval.api.metrics.combined_sample_stderr(stderrs, sizes, metrics=metrics) - - results[group]["samples"] = sum(sizes) - - results_agg = defaultdict(dict) - groups_agg = defaultdict(dict) - all_tasks_list = list(task_hierarchy.keys()) - while True: - add_tasks_list = list(k for k in results_agg.keys()) - left_tasks_list = sorted(list(set(all_tasks_list) - set(add_tasks_list))) - if len(left_tasks_list) == 0: - break - - _task_hierarchy = { - k: v for k, v in task_hierarchy.items() if k in left_tasks_list - } - _results_agg, _groups_agg = prepare_print_tasks(_task_hierarchy, results) - - results_agg = {**results_agg, **_results_agg} - groups_agg = {**groups_agg, **_groups_agg} - - for group_name, task_list in task_hierarchy.items(): - if task_list: - num_fewshot[group_name] = num_fewshot[ - task_list[0] - ] # TODO: validate this - - results_dict = { - "results": dict(results_agg.items()), - **({"groups": dict(groups_agg.items())} if bool(groups_agg) else {}), - "group_subtasks": dict(reversed(task_hierarchy.items())), - "configs": dict(sorted(configs.items())), - "versions": dict(sorted(versions.items())), - "n-shot": dict(sorted(num_fewshot.items())), - } - if log_samples: - results_dict["samples"] = dict(samples) - return results_dict + if selected_metrics in metrics: + numericSample = NumericSample(score=example[selected_metrics], id=doc_id) + scores.append(numericSample) + + assert len(result_task_id) == 1 + + mongo_result = PocketNetworkMongoDBResultNumerical( + task_id=list(result_task_id)[0], + num_samples=len(result_num_samples), + scores=scores) + mongo_result.append(mongo_result.model_dump(by_alias=True)) + eval_logger.debug("Mongo Result:", mongo_result=mongo_result) + return mongo_result From f941fdbb047dbf452577a09170ff147ef51af930 Mon Sep 17 00:00:00 2001 From: Nicolas Aguirre Date: Thu, 13 Jun 2024 12:38:51 -0300 Subject: [PATCH 2/4] complet rebase --- packages/python/protocol/protocol.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 3f9e3b9..376d6fb 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -258,4 +258,5 @@ class PocketNetworkMongoDBTokenizer(BaseModel): hash : str class Config: - arbitrary_types_allowed = True \ No newline at end of file + arbitrary_types_allowed = True + scores: List[NumericSample] \ No newline at end of file From 607f507bb1b1407104e241fe12f9843dfbd96aaf Mon Sep 17 00:00:00 2001 From: Nicolas Aguirre Date: Fri, 14 Jun 2024 03:32:36 -0300 Subject: [PATCH 3/4] * Fix Aplication error with ObjectID. * Remove responses not correct or not answered at all. * Kept only those docs_ids filled by all its instances/responses * get_task_manager handles properly the logger into EvaluatePocketNetworkConfigurableTask. * Tokenizer index now include inique=true * Fix metrics in packages/python/lmeh/utils/task_config.py * Save result in mongoDB --- apps/go/manager/records/node.go | 2 +- .../evaluator/activities/lmeh/evaluate.py | 25 ++++--- .../signatures/tokenizer_evaluate.py | 10 +-- .../morse-poc/apps_configs/evaluator.json | 2 +- .../dependencies_configs/mongodb/init-db.js | 2 +- .../python/lmeh/pocket_lm_eval/api/task.py | 58 +++++++------- .../lmeh/pocket_lm_eval/tasks/__init__.py | 4 +- packages/python/lmeh/utils/common.py | 1 + packages/python/lmeh/utils/generator.py | 75 +++++++++++-------- packages/python/lmeh/utils/mongodb.py | 74 ++++++++++++------ packages/python/lmeh/utils/task_config.py | 4 +- packages/python/lmeh/utils/tokenizers.py | 2 +- packages/python/protocol/protocol.py | 5 +- 13 files changed, 155 insertions(+), 109 deletions(-) diff --git a/apps/go/manager/records/node.go b/apps/go/manager/records/node.go index e7ae3ed..3932e2d 100644 --- a/apps/go/manager/records/node.go +++ b/apps/go/manager/records/node.go @@ -180,7 +180,7 @@ func (record *NodeRecord) AppendTask(framework string, task string, date time.Ti newTask := SignatureTaskRecord{ TaskData: baseTaskData, - LastSignature: "83332a7f32e4188bb276a18ff78620acfd3c6edbd68002b746bda990ed30d56c", + LastSignature: "", Signatures: make([]SignatureSample, bufferLen), CircBuffer: types.CircularBuffer{ CircBufferLen: bufferLen, diff --git a/apps/python/evaluator/activities/lmeh/evaluate.py b/apps/python/evaluator/activities/lmeh/evaluate.py index c264b70..9ed1fe2 100644 --- a/apps/python/evaluator/activities/lmeh/evaluate.py +++ b/apps/python/evaluator/activities/lmeh/evaluate.py @@ -171,14 +171,17 @@ async def lmeh_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: eval_logger.debug("Generating LM") lm = EvaluatorLM(**args.llm_args) eval_logger.debug("LM generated successfully.") - task_output = await lmeh_generator.evaluate( - lm=lm, - task_dict=task_dict, - task_id=args.task_id, - mongo_client=mongo_client, - selected_metrics=open_llm_metrics, - eval_logger=eval_logger, - ) - eval_logger.info("Evaluation completed successfully.") - - return True + try: + result = await lmeh_generator.evaluate( + lm=lm, + task_dict=task_dict, + task_id=args.task_id, + mongo_client=mongo_client, + selected_metrics=open_llm_metrics, + eval_logger=eval_logger, + ) + eval_logger.info("Evaluation completed successfully.") + except ApplicationError as e: + raise e + + return result diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py index 0d7c634..45c9f22 100644 --- a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -94,29 +94,29 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: tokenizer_ok = True except Exception as e: # This is not an error is just a failure in retrieval of tokenizer - eval_logger.info(f"Cannot load tokenizer from response.") - eval_logger.debug(f"Exeption:", Exeption=str(e)) + eval_logger.info("Cannot load tokenizer from response.") + eval_logger.debug("Exeption:", Exeption=str(e)) tokenizer_ok = False tokenizer_new = False if tokenizer_ok: # check if the tokenizer exists in db tokenizer_db = await mongo_operator.get_tokenizer_entry(tokenizer_mongo_new.hash) - if tokenizer_db == None: + if tokenizer_db is None: eval_logger.debug("Tokenizer does not exists.") # the tokenizer is not tracked, we need to create an entry tokenizer_new = True try: async with mongo_client.start_transaction() as session: await mongo_client.db["tokenizers"].insert_many( - [tokenizer_mongo_new], + [tokenizer_mongo_new.model_dump(by_alias=True)], ordered=False, session=session, ) eval_logger.debug("Saved new tokenizer to DB.") except Exception as e: eval_logger.error("Failed to save Tokenizer to MongoDB.") - eval_logger.error(f"Exeption:", Exeption=str(e)) + eval_logger.error("Exeption:", Exeption=str(e)) raise ApplicationError("Failed to save tokenizer to MongoDB.", non_retryable=True) # Update the result with valid data diff --git a/docker-compose/morse-poc/apps_configs/evaluator.json b/docker-compose/morse-poc/apps_configs/evaluator.json index 31b6445..bf2d2d6 100644 --- a/docker-compose/morse-poc/apps_configs/evaluator.json +++ b/docker-compose/morse-poc/apps_configs/evaluator.json @@ -1,7 +1,7 @@ { "postgres_uri": "postgresql://admin:admin@postgresql:5432/pocket-ml-testbench", "mongodb_uri": "mongodb://mongodb:27017/pocket-ml-testbench", - "log_level": "DEBUG", + "log_level": "INFO", "temporal": { "host": "temporal", "port": 7233, diff --git a/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js b/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js index 6580cea..14c6771 100644 --- a/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js +++ b/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js @@ -1,7 +1,7 @@ db = db.getSiblingDB('pocket-ml-testbench'); db.createCollection('tokenizers'); -db.tokenizers.createIndex({hash: 1}); +db.tokenizers.createIndex({hash: 1}, {unique: true}); db.createCollection('tasks'); db.tasks.createIndex({ diff --git a/packages/python/lmeh/pocket_lm_eval/api/task.py b/packages/python/lmeh/pocket_lm_eval/api/task.py index b3cde61..4220534 100644 --- a/packages/python/lmeh/pocket_lm_eval/api/task.py +++ b/packages/python/lmeh/pocket_lm_eval/api/task.py @@ -378,7 +378,7 @@ async def load_from_sql(self, dataset_kwargs: Optional[Dict[str, Any]] = None) - # validate that the split exists in the _split_ranges self.check_split_exist(_split, _split_ranges) else: - self.eval_logger.error(f"Config without splits:", config=self.config) + self.eval_logger.error("Config without splits:", config=self.config) raise ApplicationError( f"Neither {self.config.test_split} nor {self.config.validation_split} in splits were found in " f"'_split_ranges'. Available splits are {_split_ranges.keys()}", @@ -393,7 +393,7 @@ async def load_from_sql(self, dataset_kwargs: Optional[Dict[str, Any]] = None) - if doc_ids: if _split != self.get_split_from_ids(_split_ranges, doc_ids): self.eval_logger.error( - f"Doc_ids not in split range used for evaluation:", + "Doc_ids not in split range used for evaluation:", doc_ids=doc_ids, _split=_split, range_min=_range['min'], @@ -420,13 +420,14 @@ async def load_from_sql(self, dataset_kwargs: Optional[Dict[str, Any]] = None) - # assign dataset as dataset dictionary ds_dict = DatasetDict() for split in ds.unique("__split"): - self.eval_logger.debug(f"Adding split to DatasetDict:", split=split) + self.eval_logger.debug("Adding split to DatasetDict:", split=split) ds_dict[split] = ds.filter(lambda x: x["__split"] == split) - self.dataset = ds_dict.remove_columns(["__id", "__split"]) + self.dataset = ds_dict.remove_columns(["__split"]) # save in config the indexes used to download the dataset self._config.metadata['pocket_args'].doc_ids = indexes # Update qty to the number of documents downloaded self._config.metadata['pocket_args'].qty = len(indexes) + self.eval_split = _split ########################################################### # call the code that was after the download on the __init__ ########################################################### @@ -616,7 +617,7 @@ def check_split_exist(self, split: str, _split_ranges: dict): This function checks if a self.config.split exists in the keys of _split_ranges """ if split not in _split_ranges.keys(): - self.eval_logger.error(f"Split not found in _split_ranges:", split=split, _split_ranges=_split_ranges) + self.eval_logger.error("Split not found in _split_ranges:", split=split, _split_ranges=_split_ranges) raise ApplicationError( f"'{split}' split not found in _split_ranges: {_split_ranges.keys()}", non_retryable=True @@ -636,7 +637,7 @@ def add_string_ids_range(self, split: str, id_list_str: str, _split_ranges: dict """ min_range = _split_ranges[split]['min'] max_range = _split_ranges[split]['max'] + 1 - self.eval_logger.debug(f"Adding ids from split range:", split=split, min_range=min_range, max_range=max_range) + self.eval_logger.debug("Adding ids from split range:", split=split, min_range=min_range, max_range=max_range) id_list_str += ', '.join(str(id) for id in range(min_range, max_range)) return id_list_str @@ -647,7 +648,7 @@ def generate_random_doc_ids(self, table_name: str, _split: str, qty: int, min: i """ # check that the quantity of numbers to generate is less than the range if qty > (max - min + 1): - self.eval_logger.error(f"quantity overflow:", table_name=table_name, _split=_split, qty=qty, range_min=min, + self.eval_logger.error("quantity overflow:", table_name=table_name, _split=_split, qty=qty, range_min=min, range_max=max) raise ApplicationError( "Quantity of numbers to generate is greater than the range", @@ -661,7 +662,7 @@ def generate_random_doc_ids(self, table_name: str, _split: str, qty: int, min: i ints = ints - set(blacklist) # Check that the blacklist numbers were removed if len(ints) == original_len: - self.eval_logger.error(f"Blacklist out of range:", table_name=table_name, _split=_split, range_min=min, + self.eval_logger.error("Blacklist out of range:", table_name=table_name, _split=_split, range_min=min, range_max=max, blacklist=blacklist) raise ApplicationError( "Blacklist corresponding to '{}' table & '{}' split were not founded in the range: [{}-{}]".format( @@ -670,7 +671,7 @@ def generate_random_doc_ids(self, table_name: str, _split: str, qty: int, min: i ) # sorted random numbers choices = sorted(np.random.choice(list(ints), qty, replace=False).tolist()) - self.eval_logger.debug(f"Random numbers generated:", choices=choices) + self.eval_logger.debug("Random numbers generated:", choices=choices) return choices def get_all_doc_ids(self, _split: str, _split_ranges: dict) -> List[int]: @@ -679,7 +680,7 @@ def get_all_doc_ids(self, _split: str, _split_ranges: dict) -> List[int]: """ min_range = _split_ranges[_split]['min'] max_range = _split_ranges[_split]['max'] + 1 - self.eval_logger.debug(f"Getting all ids from split range:", split=_split, min_range=min_range, + self.eval_logger.debug("Getting all ids from split range:", split=_split, min_range=min_range, max_range=max_range) return list(range(min_range, max_range)) @@ -693,7 +694,7 @@ def get_SQL_where_clause(self, indexes, _split: str, _split_ranges: dict): if self.config.test_split: self.check_split_exist(self.config.test_split, _split_ranges) if _split != self.config.test_split: - self.eval_logger.error(f"mismatch test_split:", _split=_split, test_split=self.config.test_split) + self.eval_logger.error("mismatch test_split:", _split=_split, test_split=self.config.test_split) raise ApplicationError( f"_split '{_split}' not equal to test_split '{self.config.test_split}'", non_retryable=True @@ -716,7 +717,7 @@ def get_SQL_where_clause(self, indexes, _split: str, _split_ranges: dict): elif self.config.validation_split: self.check_split_exist(self.config.validation_split, _split_ranges) if _split != self.config.validation_split: - self.eval_logger.error(f"mismatch validation_split:", _split=_split, + self.eval_logger.error("mismatch validation_split:", _split=_split, validation_split=self.config.validation_split) raise ApplicationError( f"_split '{_split}' not equal to validation_split '{self.config.validation_split}'", @@ -731,7 +732,7 @@ def get_SQL_where_clause(self, indexes, _split: str, _split_ranges: dict): self.check_split_exist(self.config.fewshot_split, _split_ranges) id_list_str = self.add_string_ids_range(self.config.fewshot_split, id_list_str, _split_ranges) else: - self.eval_logger.error(f"Config without splits:", config=self.config) + self.eval_logger.error("Config without splits:", config=self.config) raise ApplicationError( "Neither test_split nor validation_split in config, cannot proceed, please check get_SQL_where_clause", non_retryable=True @@ -776,7 +777,7 @@ async def get_max_min_ids(self, postgres_conn: asyncpg.Connection, table_name: s GROUP BY "__split"; """.format(table_name) - self.eval_logger.debug(f"SQL query:", sql_query=sql_query) + self.eval_logger.debug("SQL query:", sql_query=sql_query) # Fetch all rows from the result rows = await postgres_conn.fetch(sql_query) @@ -789,7 +790,7 @@ async def get_max_min_ids(self, postgres_conn: asyncpg.Connection, table_name: s for row in rows: _split_ranges[row[0]] = {'min': row[1], 'max': row[2]} except Exception as error: - self.eval_logger.error(f"Error while connecting to PostgreSQL:", error=error) + self.eval_logger.error("Error while connecting to PostgreSQL:", error=error) raise ApplicationError("Error while connecting to PostgreSQL", non_retryable=True) return _split_ranges @@ -828,22 +829,18 @@ def get_split_from_ids(self, _split_ranges: dict, __ids: List[int]): break # all ids should belong to a split range if len(split_range) != len(__ids): - self.eval_logger.error(f"Ids not in split range:", split_range=split_range, __ids=__ids) + self.eval_logger.error("Ids not in split range:", split_range=split_range, __ids=__ids) raise ApplicationError("Some ids do not belong to any split range", non_retryable=True) # all ids should belong to a unique split range if len(set(split_range)) != 1: - self.eval_logger.error(f"Ids in more than one split:", __ids=__ids, split_range=split_range) + self.eval_logger.error("Ids in more than one split:", __ids=__ids, split_range=split_range) raise ApplicationError("Some ids belong to more than one split.", non_retryable=True) return list(set(split_range))[0] class EvaluatePocketNetworkConfigurableTask(PocketNetworkConfigurableTask): - # todo: override __init__ and receive also mongo_client to set on self.mongo_client avoiding pass it on build_all_requests - # like we are already doing on PocketNetworkConfigurableTask - # after do that remember call super().__init__(*args, **kwargs) - # noinspection PyMethodOverriding async def build_all_requests( self, *, @@ -857,12 +854,13 @@ async def build_all_requests( rewrite_requests_cache=False, ) -> None: """Build a set of Instances for a task, and store them in task.instances""" - self._instances = await MongoOperator(client=mongo_client).reconstruct_instances(task_id=task_id) - - if len(self._instances) == 0: - raise ApplicationError( - "task.build_all_requests() did not find any docs!", - task_id, - type="DocumentsNotFound", - non_retryable=False, - ) + self._instances, kept_doc_ids = await MongoOperator(client=mongo_client).reconstruct_instances(task_id=task_id, eval_logger=self.eval_logger) + # Kept only those docs_ids filled by all its instances/responses + if kept_doc_ids: + b_dict = {} + for i, b in enumerate(self.config.metadata['pocket_args'].doc_ids): + b_dict[b] = i + a_indices = [b_dict[a] for a in kept_doc_ids] + self.dataset[self.eval_split] = Dataset.from_dict(self.dataset[self.eval_split][a_indices]) + for doc_id in self.eval_docs: + self.eval_logger.info("Doc_id:", doc_id=doc_id) diff --git a/packages/python/lmeh/pocket_lm_eval/tasks/__init__.py b/packages/python/lmeh/pocket_lm_eval/tasks/__init__.py index 6860caa..8bea829 100644 --- a/packages/python/lmeh/pocket_lm_eval/tasks/__init__.py +++ b/packages/python/lmeh/pocket_lm_eval/tasks/__init__.py @@ -71,9 +71,9 @@ def load_task(config, task, group=None, yaml_path=None): else: config = self._process_alias(config, group=group) if self.stage == TASK_MANAGER_REGISTER_STAGE or self.stage == TASK_MANAGER_SAMPLE_STAGE: - task_object = PocketNetworkConfigurableTask(config=config, postgres_conn=self.postgres_conn) + task_object = PocketNetworkConfigurableTask(config=config, postgres_conn=self.postgres_conn, eval_logger=self.logger) elif self.stage == TASK_MANAGER_EVALUATE_STAGE: - task_object = EvaluatePocketNetworkConfigurableTask(config=config, postgres_conn=self.postgres_conn) + task_object = EvaluatePocketNetworkConfigurableTask(config=config, postgres_conn=self.postgres_conn, eval_logger=self.logger) else: ApplicationError(f"Stage {self.stage} not supported", non_retryable=True) if group is not None: diff --git a/packages/python/lmeh/utils/common.py b/packages/python/lmeh/utils/common.py index 66eb9ca..8ed5c06 100644 --- a/packages/python/lmeh/utils/common.py +++ b/packages/python/lmeh/utils/common.py @@ -33,6 +33,7 @@ def get_task_manager( include_path=include_path, pocket_args=pocket_args, stage=stage, + logger=logger, ) if tasks is None: diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index f754b5d..7e18c78 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -356,23 +356,25 @@ async def evaluate( for task_output in eval_tasks: task: Task = task_output.task limit = get_sample_size(task, limit) - await task.build_all_requests( - task_id=task_id, - mongo_client=mongo_client, - limit=limit, - rank=lm.rank, - world_size=lm.world_size, - cache_requests=cache_requests, - rewrite_requests_cache=rewrite_requests_cache, - ) - eval_logger.debug( - f"Task: {task_output.task_name}; number of requests on this rank: {len(task.instances)}" - ) - # aggregate Instances by LM method requested to get output. - for instance in task.instances: - reqtype = instance.request_type - requests[reqtype].append(instance) - + try: + await task.build_all_requests( + task_id=task_id, + mongo_client=mongo_client, + limit=limit, + rank=lm.rank, + world_size=lm.world_size, + cache_requests=cache_requests, + rewrite_requests_cache=rewrite_requests_cache, + ) + eval_logger.debug( + f"Task: {task_output.task_name}; number of requests on this rank: {len(task.instances)}" + ) + # aggregate Instances by LM method requested to get output. + for instance in task.instances: + reqtype = instance.request_type + requests[reqtype].append(instance) + except Exception as e: + raise e eval_logger.debug("Instances generated successfully:") ### Run LM on inputs, get all outputs ### # execute each type of request @@ -394,7 +396,7 @@ async def evaluate( RANK = lm.rank WORLD_SIZE = lm.world_size - mongo_result = [] + insert_mongo_results = [] ### Postprocess outputs ### # TODO: del model here, maybe (idea: allow user to specify device of e.g. reward model separately) for task_output in eval_tasks: @@ -413,7 +415,6 @@ async def evaluate( instances.sort(key=lambda x: x.idx) # iterate over different filters used scores = [] - result_task_id = set() result_num_samples = set() for filter_key in task.instances[0].filtered_resps.keys(): doc_iterator = task.doc_iterator( @@ -426,7 +427,6 @@ async def evaluate( metrics = task.process_results( doc, [req.filtered_resps[filter_key] for req in requests] ) - result_task_id.add(requests[0].prompt.task_id) if log_samples: target = task.doc_to_target(doc) example = { @@ -443,16 +443,31 @@ async def evaluate( task_output.logged_samples.append(example) for metric, value in metrics.items(): task_output.sample_metrics[(metric, filter_key)].append(value) - if selected_metrics in metrics: - numericSample = NumericSample(score=example[selected_metrics], id=doc_id) - scores.append(numericSample) - - assert len(result_task_id) == 1 + if selected_metrics in metrics: + numericSample = NumericSample(score=example[selected_metrics], id=doc_id) + scores.append(numericSample) - mongo_result = PocketNetworkMongoDBResultNumerical( - task_id=list(result_task_id)[0], + num_result = PocketNetworkMongoDBResultNumerical( + task_id=task_id, num_samples=len(result_num_samples), scores=scores) - mongo_result.append(mongo_result.model_dump(by_alias=True)) - eval_logger.debug("Mongo Result:", mongo_result=mongo_result) - return mongo_result + insert_mongo_results.append(num_result.model_dump(by_alias=True)) + eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results) + try: + async with mongo_client.start_transaction() as session: + + await mongo_client.db['results'].insert_many( + insert_mongo_results, + ordered=False, + session=session, + ) + except Exception as e: + eval_logger.error("Failed to save documents (results) to MongoDB.", error=e) + raise ApplicationError( + "Failed to save documents (results) to MongoDB.", + str(e), + type="Mongodb", + non_retryable=True, + ) + + return True diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index e20942b..93df34c 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -1,4 +1,5 @@ import json +import logging from copy import deepcopy from typing import List, Optional from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection @@ -81,7 +82,7 @@ async def get_tokenizer_objects(self, address: str, service: str) -> dict: # Validate that the tokenizer is not empty if tokenizer_object is None: - eval_logger.error(f"Tokenizer hash not found.", address=address, hash=tokenizer_hash) + eval_logger.error("Tokenizer hash not found.", address=address, hash=tokenizer_hash) raise ApplicationError(f"Tokenizer with hash {tokenizer_hash} does not exist in the database.") tokenizer = tokenizer_object['tokenizer'] @@ -132,7 +133,7 @@ async def get_doc_ids_by_task(self, task_id: ObjectId) -> List[int]: evaluation_logger.error("Task ID not found.", task_id=task_id) raise ApplicationError( f"Task ID {task_id} does not exist in the database.", - task_id, + str(task_id), type="TaskNotFound", non_retryable=False, ) @@ -148,7 +149,7 @@ async def get_task(self, task_id: ObjectId): evaluation_logger.error("Task ID not found.", task_id=task_id) raise ApplicationError( f"Task ID {task_id} does not exist in the database.", - task_id, + str(task_id), type="TaskNotFound", non_retryable=False, ) @@ -168,7 +169,7 @@ async def retrieve_responses(self, task_id: ObjectId, ) -> List[str]: evaluation_logger.error("Task ID not found.", task_id=task_id) raise ApplicationError( f"Task ID {task_id} does not exist in the database.", - task_id, + str(task_id), type="TaskNotFound", non_retryable=False, ) @@ -176,24 +177,31 @@ async def retrieve_responses(self, task_id: ObjectId, ) -> List[str]: return result - async def reconstruct_instances(self, task_id: ObjectId, ) -> List[Instance]: + async def reconstruct_instances(self, task_id: ObjectId, eval_logger:logging.Logger) -> List[Instance]: result = await self.retrieve_responses(task_id) valid_fields = {field.name for field in Instance.__dataclass_fields__.values()} instances = [] + remove_doc_ids = set() + kept_doc_ids = set() for doc in result: i, p = doc['instance'], doc['prompt'] - try: - # handle the exception to bring a light on production debugging if needed. - r = json.loads(doc['response']['response']) - except Exception as e: - raise ApplicationError( - "Bad JSON data format", - doc['response']['response'], str(e), - type="BadJSONFormat", - non_retryable=True, - ) + if not doc['response']['ok']: + remove_doc_ids.add(i['doc_id']) + continue + else: + try: + # handle the exception to bring a light on production debugging if needed. + r = json.loads(doc['response']['response']) + except Exception as e: + remove_doc_ids.add(i['doc_id']) + eval_logger.error( + "Bad JSON data format", + response = doc['response']['response'], + errpr = str(e), + ) + continue instance_dict = {key: value for key, value in i.items() if key in valid_fields} instance = Instance(**instance_dict) instance.repeats = 1 # to avoid double evaluation for each instance @@ -204,15 +212,37 @@ async def reconstruct_instances(self, task_id: ObjectId, ) -> List[Instance]: # handle the exception to bring a light on production debugging if needed. request_data = json.loads(instance.prompt.data) except Exception as e: - raise ApplicationError( - "Bad JSON data format", - instance.prompt.data, str(e), - type="BadJSONFormat", - non_retryable=True, + remove_doc_ids.add(i['doc_id']) + eval_logger.error( + "Bad JSON data format", + prompt_data=instance.prompt.data, + error=str(e), ) + continue instance.prompt.data = CompletionRequest(**request_data) - instance.resp = CompletionResponse(**r) + + try: + instance.resp = CompletionResponse(**r) + except Exception as e: + remove_doc_ids.add(i['doc_id']) + eval_logger.error( + "Bad JSON CompletionResponse format", + response=r, + error=str(e), + ) + continue instances.append(instance) + if len(instances) == 0 and len(remove_doc_ids) > 0: + raise ApplicationError( + f"Instances do not complete a doc_id for the task ID {str(task_id)}", + non_retryable=False, + ) + # Remove uncompleted docs_ids + if len(remove_doc_ids) > 0: + eval_logger.warning("Some instances were not completed, removing all instances with the same doc_id.", doc_ids=remove_doc_ids) + instances = [i for i in instances if i.doc_id not in remove_doc_ids] + for i in instances: + kept_doc_ids.add(i.doc_id) instances = sorted(instances, key=lambda x: (x.doc_id, x.idx)) - return instances + return instances, sorted(list(kept_doc_ids)) diff --git a/packages/python/lmeh/utils/task_config.py b/packages/python/lmeh/utils/task_config.py index 1b5a94c..cdb98b2 100644 --- a/packages/python/lmeh/utils/task_config.py +++ b/packages/python/lmeh/utils/task_config.py @@ -8,11 +8,11 @@ "num_fewshot":10 }, "truthfulqa_mc2": { - "metric": "mc2", + "metric": "acc", "num_fewshot":0 }, "mmlu": { - "metric": "average", + "metric": "acc", "num_fewshot":5 }, "winogrande": { diff --git a/packages/python/lmeh/utils/tokenizers.py b/packages/python/lmeh/utils/tokenizers.py index b91acd9..ebb842f 100644 --- a/packages/python/lmeh/utils/tokenizers.py +++ b/packages/python/lmeh/utils/tokenizers.py @@ -19,7 +19,7 @@ def _get_tokenizer_jsons(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokeniz """Get tokenizer jsons been used""" CURRENT_DIR = os.path.dirname(__file__) - if TOKENIZER_EPHIMERAL_PATH == None: + if TOKENIZER_EPHIMERAL_PATH is None: TOKENIZER_EPHIMERAL_PATH = Path( os.path.join(CURRENT_DIR, "tmp_tokenizer")) else: diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 376d6fb..268e6fc 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -227,7 +227,7 @@ class CompletionResponse(OpenAIBaseModel): class PocketNetworkMongoDBResultBase(BaseModel): id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") task_id: ObjectId - num_samples: int + num_samples: int class Config: arbitrary_types_allowed = True @@ -258,5 +258,4 @@ class PocketNetworkMongoDBTokenizer(BaseModel): hash : str class Config: - arbitrary_types_allowed = True - scores: List[NumericSample] \ No newline at end of file + arbitrary_types_allowed = True \ No newline at end of file From 73c9e36ece27161e653a3d4b2a7cf736cc5a20c2 Mon Sep 17 00:00:00 2001 From: Nicolas Aguirre Date: Fri, 14 Jun 2024 17:33:48 -0300 Subject: [PATCH 4/4] * Logs relaxed * Update Tokenizer_evaluate (by Rawthil) * reconstruct_instances now return result_height and do not raise error when no doc_id --- .../evaluator/activities/lmeh/evaluate.py | 6 +- .../signatures/tokenizer_evaluate.py | 24 ++++--- .../python/lmeh/pocket_lm_eval/api/task.py | 6 +- packages/python/lmeh/utils/generator.py | 69 ++++++++++++++----- packages/python/lmeh/utils/mongodb.py | 18 +++-- packages/python/protocol/protocol.py | 4 ++ 6 files changed, 90 insertions(+), 37 deletions(-) diff --git a/apps/python/evaluator/activities/lmeh/evaluate.py b/apps/python/evaluator/activities/lmeh/evaluate.py index 9ed1fe2..a68c155 100644 --- a/apps/python/evaluator/activities/lmeh/evaluate.py +++ b/apps/python/evaluator/activities/lmeh/evaluate.py @@ -42,6 +42,10 @@ async def lmeh_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: if args.llm_args is None: args.llm_args = {} + eval_logger.info( + "Starting activity lmeh_evaluate", + task_id = str(args.task_id), + ) mongo_client = config["mongo_client"] mongo_operator = MongoOperator(client=mongo_client) @@ -154,7 +158,7 @@ async def lmeh_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: try: # it is loading data from sql to a dataset await task_dict[task_name].load_from_sql() - eval_logger.info("Task loaded successfully:", task_dict=task_dict) + eval_logger.debug("Task loaded successfully:", task_dict=task_dict) except ApplicationError as e: raise e except Exception as error: diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py index 45c9f22..5576c5d 100644 --- a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -1,5 +1,6 @@ import json from hashlib import sha256 +from datetime import datetime from app.app import get_app_config, get_app_logger from bson import ObjectId @@ -45,22 +46,26 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: mongo_client = config["mongo_client"] mongo_operator = MongoOperator(client=mongo_client) - # Retrieve Task request. - task_data = await mongo_operator.get_task(args.task_id) - # Retrieve all responses responses = await mongo_operator.retrieve_responses(args.task_id) if len(responses) != 1: eval_logger.error(f"Found {len(responses)} responses, only 1 is expected.") raise ApplicationError( f"Task ID {args.task_id}: Found {len(responses)} responses, only 1 is expected.", - args.task_id, + str(args.task_id), type="ResponseError", non_retryable=False, ) - + # Create the result, empty for now - result = PocketNetworkMongoDBResultSignature(task_id=args.task_id, num_samples=0, signatures=[]) + result = PocketNetworkMongoDBResultSignature(task_id=args.task_id, + status=responses[0]["response"]["error_code"], + num_samples=0, + result_height=responses[0]["response"]["height"], + result_time=datetime.today().isoformat(), + signatures=[]) + + # Get tokenizer jsons tokenizer_decoded = False @@ -68,7 +73,7 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: tokenizer_jsons = json.loads(responses[0]["response"]["response"]) tokenizer_decoded = True except Exception as e: - eval_logger.debug(f"Exeption:", Exeption=str(e)) + eval_logger.debug("Exeption:", Exeption=str(e)) tokenizer_ok = False if tokenizer_decoded: @@ -121,6 +126,7 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: # Update the result with valid data result.num_samples = 1 # Always one + result.status = 0 # OK result.signatures = [ SignatureSample(signature=str(tokenizer_mongo_new.hash), id=0) # This task has a single sample id ] @@ -136,11 +142,11 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: eval_logger.debug("Saved result to DB.") except Exception as e: eval_logger.error("Failed to save Result to MongoDB.") - eval_logger.error(f"Exeption:", Exeption=str(e)) + eval_logger.error("Exeption:", Exeption=str(e)) raise ApplicationError("Failed to save result to MongoDB.", non_retryable=True) eval_logger.info( - f"Status:", tokenizer_decoded=tokenizer_decoded, tokenizer_is_valid=tokenizer_ok, tokenizer_is_new=tokenizer_new + "Status:", tokenizer_decoded=tokenizer_decoded, tokenizer_is_valid=tokenizer_ok, tokenizer_is_new=tokenizer_new ) return True diff --git a/packages/python/lmeh/pocket_lm_eval/api/task.py b/packages/python/lmeh/pocket_lm_eval/api/task.py index 4220534..975ba74 100644 --- a/packages/python/lmeh/pocket_lm_eval/api/task.py +++ b/packages/python/lmeh/pocket_lm_eval/api/task.py @@ -854,13 +854,11 @@ async def build_all_requests( rewrite_requests_cache=False, ) -> None: """Build a set of Instances for a task, and store them in task.instances""" - self._instances, kept_doc_ids = await MongoOperator(client=mongo_client).reconstruct_instances(task_id=task_id, eval_logger=self.eval_logger) + self._instances, kept_doc_ids, self.result_height = await MongoOperator(client=mongo_client).reconstruct_instances(task_id=task_id, eval_logger=self.eval_logger) # Kept only those docs_ids filled by all its instances/responses if kept_doc_ids: b_dict = {} for i, b in enumerate(self.config.metadata['pocket_args'].doc_ids): b_dict[b] = i a_indices = [b_dict[a] for a in kept_doc_ids] - self.dataset[self.eval_split] = Dataset.from_dict(self.dataset[self.eval_split][a_indices]) - for doc_id in self.eval_docs: - self.eval_logger.info("Doc_id:", doc_id=doc_id) + self.dataset[self.eval_split] = Dataset.from_dict(self.dataset[self.eval_split][a_indices]) \ No newline at end of file diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index 7e18c78..7c8dd69 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -1,3 +1,4 @@ +from datetime import datetime import logging from collections import defaultdict from typing import TYPE_CHECKING, List, Optional, Union @@ -341,6 +342,30 @@ async def evaluate( Dictionary of results """ + async def save_results( + mongo_client: MongoClient, + insert_mongo_results: List[dict], + eval_logger: Optional[logging.Logger] = None, + ): + + try: + async with mongo_client.start_transaction() as session: + + await mongo_client.db['results'].insert_many( + insert_mongo_results, + ordered=False, + session=session, + ) + except Exception as e: + eval_logger.error("Failed to save documents (results) to MongoDB.", error=e) + raise ApplicationError( + "Failed to save documents (results) to MongoDB.", + str(e), + type="Mongodb", + non_retryable=True, + ) + return + # tracks all Instances/requests a model must generate output on. requests = defaultdict(list) @@ -375,6 +400,25 @@ async def evaluate( requests[reqtype].append(instance) except Exception as e: raise e + + if len(task.instances) == 0: + insert_mongo_results = [] + eval_logger.debug("No instances/doc_id generated for task.", task_id=str(task_id)) + num_result = PocketNetworkMongoDBResultNumerical( + task_id=task_id, + num_samples=0, + status=1, + result_height=task.result_height, + result_time=datetime.today().isoformat(), + scores=[]) + insert_mongo_results.append(num_result.model_dump(by_alias=True)) + await save_results( + mongo_client=mongo_client, + insert_mongo_results=insert_mongo_results, + eval_logger=eval_logger, + ) + return True + eval_logger.debug("Instances generated successfully:") ### Run LM on inputs, get all outputs ### # execute each type of request @@ -450,24 +494,17 @@ async def evaluate( num_result = PocketNetworkMongoDBResultNumerical( task_id=task_id, num_samples=len(result_num_samples), + status=0, + result_height=task.result_height, + result_time=datetime.today().isoformat(), scores=scores) insert_mongo_results.append(num_result.model_dump(by_alias=True)) eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results) - try: - async with mongo_client.start_transaction() as session: - await mongo_client.db['results'].insert_many( - insert_mongo_results, - ordered=False, - session=session, - ) - except Exception as e: - eval_logger.error("Failed to save documents (results) to MongoDB.", error=e) - raise ApplicationError( - "Failed to save documents (results) to MongoDB.", - str(e), - type="Mongodb", - non_retryable=True, - ) + await save_results( + mongo_client=mongo_client, + insert_mongo_results=insert_mongo_results, + eval_logger=eval_logger, + ) - return True + return True \ No newline at end of file diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index 93df34c..6fe6cd5 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -185,8 +185,10 @@ async def reconstruct_instances(self, task_id: ObjectId, eval_logger:logging.Log instances = [] remove_doc_ids = set() kept_doc_ids = set() + list_result_height = [] for doc in result: i, p = doc['instance'], doc['prompt'] + list_result_height.append(doc['response']['session_height']) if not doc['response']['ok']: remove_doc_ids.add(i['doc_id']) continue @@ -206,7 +208,7 @@ async def reconstruct_instances(self, task_id: ObjectId, eval_logger:logging.Log instance = Instance(**instance_dict) instance.repeats = 1 # to avoid double evaluation for each instance p['id'] = deepcopy(p['_id']) - p.pop('_id') + p.pop('_id') instance.prompt = PocketNetworkMongoDBPrompt(**p) try: # handle the exception to bring a light on production debugging if needed. @@ -231,18 +233,20 @@ async def reconstruct_instances(self, task_id: ObjectId, eval_logger:logging.Log error=str(e), ) continue + instances.append(instance) + + result_height = max(list_result_height) + if len(instances) == 0 and len(remove_doc_ids) > 0: - raise ApplicationError( - f"Instances do not complete a doc_id for the task ID {str(task_id)}", - non_retryable=False, - ) + return [], [], result_height + # Remove uncompleted docs_ids if len(remove_doc_ids) > 0: - eval_logger.warning("Some instances were not completed, removing all instances with the same doc_id.", doc_ids=remove_doc_ids) instances = [i for i in instances if i.doc_id not in remove_doc_ids] for i in instances: kept_doc_ids.add(i.doc_id) instances = sorted(instances, key=lambda x: (x.doc_id, x.idx)) - return instances, sorted(list(kept_doc_ids)) + + return instances, sorted(list(kept_doc_ids)), result_height diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 268e6fc..2081812 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -1,5 +1,6 @@ import time import uuid +from datetime import datetime from typing import List, Literal, Optional, Union, Dict from bson import ObjectId from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator @@ -228,6 +229,9 @@ class PocketNetworkMongoDBResultBase(BaseModel): id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") task_id: ObjectId num_samples: int + status: int + result_height: int + result_time: datetime class Config: arbitrary_types_allowed = True