From 0ebc718df142f1aaf8fe1007ba20ee05c90f1f23 Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Wed, 12 Jun 2024 18:06:03 -0300 Subject: [PATCH 1/3] almost working... --- apps/go/manager/records/task.go | 2 +- .../evaluator/activities/get_task_data.py | 47 +++++++ .../evaluator/activities/lmeh/evaluate.py | 2 +- .../signatures/tokenizer_evaluate.py | 133 ++++++++++++++++++ apps/python/evaluator/worker/main.py | 8 +- apps/python/evaluator/workflows/evaluator.py | 49 ++++++- apps/python/sidecar/main.py | 5 +- packages/python/lmeh/utils/mongodb.py | 26 +++- packages/python/lmeh/utils/tokenizers.py | 9 +- packages/python/protocol/protocol.py | 37 ++++- 10 files changed, 298 insertions(+), 20 deletions(-) create mode 100644 apps/python/evaluator/activities/get_task_data.py create mode 100644 apps/python/evaluator/activities/signatures/tokenizer_evaluate.py diff --git a/apps/go/manager/records/task.go b/apps/go/manager/records/task.go index b317e6d..ef9cf81 100644 --- a/apps/go/manager/records/task.go +++ b/apps/go/manager/records/task.go @@ -203,7 +203,7 @@ type NumericalTaskRecord struct { } type ScoresSample struct { - Score float32 `bson:"scores"` + Score float32 `bson:"score"` ID int `bson:"id"` } diff --git a/apps/python/evaluator/activities/get_task_data.py b/apps/python/evaluator/activities/get_task_data.py new file mode 100644 index 0000000..8f47ed0 --- /dev/null +++ b/apps/python/evaluator/activities/get_task_data.py @@ -0,0 +1,47 @@ +import os +import sys + +from temporalio import activity +from temporalio.exceptions import ApplicationError + +# add file path to sys.path +sys.path.append(os.path.dirname(os.path.realpath(__file__))) +from packages.python.common.auto_heartbeater import auto_heartbeater + +from app.app import get_app_logger, get_app_config + +from packages.python.lmeh.utils.mongodb import MongoOperator + + +# Custom modules +from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest +from bson import ObjectId + +@activity.defn +@auto_heartbeater +async def get_task_data(args: PocketNetworkEvaluationTaskRequest) -> tuple[str, str]: + + app_config = get_app_config() + eval_logger = get_app_logger("evaluation") + config = app_config['config'] + + mongo_client = config["mongo_client"] + mongo_operator = MongoOperator(client=mongo_client) + + eval_logger.debug(f"Searching for task {args.task_id}.") + + try: + args.task_id = ObjectId(args.task_id) + except Exception as e: + raise ApplicationError( + "Bad Task ID format", + str(e), args.task_id, + type="BadParams", + non_retryable=True, + ) + + task_mongo = await mongo_operator.get_task(args.task_id) + + eval_logger.debug(f"Found! Evaluating [{task_mongo.framework}][{task_mongo.tasks}].") + + return task_mongo.framework, task_mongo.tasks \ No newline at end of file diff --git a/apps/python/evaluator/activities/lmeh/evaluate.py b/apps/python/evaluator/activities/lmeh/evaluate.py index 5a5fa4b..bd3265f 100644 --- a/apps/python/evaluator/activities/lmeh/evaluate.py +++ b/apps/python/evaluator/activities/lmeh/evaluate.py @@ -15,7 +15,7 @@ @activity.defn @auto_heartbeater -async def evaluation(args: PocketNetworkEvaluationTaskRequest) -> bool: +async def lmeh_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: """ Returns a dict where each key is a task name with the evaluation result. :param args: diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py new file mode 100644 index 0000000..d0f2fac --- /dev/null +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -0,0 +1,133 @@ +from bson import ObjectId +from temporalio import activity +from temporalio.exceptions import ApplicationError + +from app.app import get_app_logger, get_app_config +from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest, PocketNetworkMongoDBResultSignature, SignatureSample +from packages.python.lmeh.utils.mongodb import MongoOperator +from packages.python.common.auto_heartbeater import auto_heartbeater + +from packages.python.lmeh.utils.tokenizers import load_tokenizer, prepare_tokenizer + +import json +from hashlib import sha256 + + +@activity.defn +@auto_heartbeater +async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: + """ + Returns a dict where each key is a task name with the evaluation result. + :param args: + :return: + """ + + + app_config = get_app_config() + eval_logger = get_app_logger("evaluation") + config = app_config['config'] + + try: + args.task_id = ObjectId(args.task_id) + except Exception as e: + raise ApplicationError( + "Bad Task ID format", + str(e), args.task_id, + type="BadParams", + non_retryable=True, + ) + + mongo_client = config["mongo_client"] + mongo_operator = MongoOperator(client=mongo_client) + + # Retrieve Task request. + task_data = await mongo_operator.get_task(args.task_id) + + # Retrieve all responses + responses = await mongo_operator.retrieve_responses(args.task_id) + if len(responses)!=1: + eval_logger.error(f"Found {len(responses)} responses, only 1 is expected.") + raise ApplicationError( + f"Task ID {args.task_id}: Found {len(responses)} responses, only 1 is expected.", + args.task_id, + type="ResponseError", + non_retryable=False, + ) + + # Create the result, empty for now + result = PocketNetworkMongoDBResultSignature( + task_id = args.task_id, + num_samples = 0, + signatures=[] + ) + + # Get tokenizer jsons + tokenizer_jsons = json.loads(responses[0]['response']['response']) + eval_logger.debug("Tokenizer found.", tokenizer_keys=list(tokenizer_jsons.keys())) + tokenizer_ok = False + if 'model_max_length' in tokenizer_jsons['tokenizer_config']: + tokenizer_jsons['tokenizer_config']['model_max_length'] = int( + tokenizer_jsons['tokenizer_config']['model_max_length']) + try: + # Try to load, if this succeds, the tokenizer is OK + tokenizer = load_tokenizer( + tokenizer_objects=tokenizer_jsons, + wf_id='', + tokenizer_ephimeral_path='/tmp/lala' + ) + eval_logger.debug("Tokenizer loaded.") + # This creates the structure used in the database, containing the hash + tokenizer_mongo_new = prepare_tokenizer(tokenizer) + eval_logger.debug("Tokenizer processed.") + tokenizer_ok = True + except Exception as e: + # This is not an error is just a failure in retrieval of tokenizer + eval_logger.info(f"Cannot load tokenizer from response.") + eval_logger.error(f"Exeption:", Exeption=str(e)) + + asdasdasd + + tokenizer_ok = False + + + tokenizer_new = False + if tokenizer_ok: + # check if the tokenizer exists in db + tokenizer_db = mongo_operator.get_tokenizer_entry(tokenizer_mongo_new['hash']) + if tokenizer_db == None: + # the tokenizer is not tracked, we need to create an entry + tokenizer_new = True + try: + async with mongo_client.start_transaction() as session: + await mongo_client.db['tokenizers'].insert_many( + [tokenizer_mongo_new.model_dump(by_alias=True)], + ordered=False, + session=session, + ) + except Exception as e: + eval_logger.error("Failed to save Tokenizer to MongoDB.") + eval_logger.error(f"Exeption:", Exeption=str(e)) + raise ApplicationError("Failed to save tokenizer to MongoDB.", non_retryable=True) + + # Update the result with valid data + result.num_samples = 1, # Always one + result.signatures=[SignatureSample( + signature = tokenizer_mongo_new['hash'], + id = 0 # This task has a single sample id + )] + + + # Save to results db + try: + async with mongo_client.start_transaction() as session: + await mongo_client.db['responses'].insert_many( + [result.model_dump(by_alias=True)], + ordered=False, + session=session, + ) + except Exception as e: + eval_logger.error("Failed to save Result to MongoDB.") + eval_logger.error(f"Exeption:", Exeption=str(e)) + raise ApplicationError("Failed to save result to MongoDB.", non_retryable=True) + + return {'tokenizer_is_valid': tokenizer_ok, 'tokenizer_is_new' : tokenizer_new} diff --git a/apps/python/evaluator/worker/main.py b/apps/python/evaluator/worker/main.py index 7d576a5..fc6fd4d 100644 --- a/apps/python/evaluator/worker/main.py +++ b/apps/python/evaluator/worker/main.py @@ -12,7 +12,11 @@ from app.app import setup_app, get_app_logger from app.config import read_config -from activities.lmeh.evaluate import evaluation as lmeh_evaluate +from activities.lmeh.evaluate import lmeh_evaluate +from activities.get_task_data import get_task_data +from activities.signatures.tokenizer_evaluate import tokenizer_evaluate + + from workflows.evaluator import Evaluator import concurrent.futures @@ -78,7 +82,9 @@ async def main(): Evaluator, ], "activities": [ + get_task_data, lmeh_evaluate, + tokenizer_evaluate, ], "activity_executor": activity_executor, } diff --git a/apps/python/evaluator/workflows/evaluator.py b/apps/python/evaluator/workflows/evaluator.py index 27a1114..c02b4fb 100644 --- a/apps/python/evaluator/workflows/evaluator.py +++ b/apps/python/evaluator/workflows/evaluator.py @@ -7,7 +7,9 @@ # add this to ensure app config is available on the thread from app.app import get_app_logger, get_app_config # add any activity that needs to be used on this workflow - from activities.lmeh.evaluate import evaluation as lmeh_evaluation + from activities.lmeh.evaluate import lmeh_evaluate + from activities.get_task_data import get_task_data + from activities.signatures.tokenizer_evaluate import tokenizer_evaluate from pydantic import BaseModel from packages.python.protocol.converter import pydantic_data_converter @@ -16,12 +18,45 @@ class Evaluator: @workflow.run async def run(self, args: PocketNetworkEvaluationTaskRequest) -> bool: -# if args.framework == "lmeh": - _ = await workflow.execute_activity( - lmeh_evaluation, + + # Extract framework and task to evaluate + framework, task = await workflow.execute_activity( + get_task_data, args, - start_to_close_timeout=timedelta(seconds=300), + start_to_close_timeout=timedelta(seconds=10), retry_policy=RetryPolicy(maximum_attempts=2), ) - return True - # raise ApplicationError(f"{args.framework} framework not implemented yet") + + if framework == "lmeh": + _ = await workflow.execute_activity( + lmeh_evaluate, + args, + start_to_close_timeout=timedelta(seconds=300), + retry_policy=RetryPolicy(maximum_attempts=2), + ) + + elif framework == "signatures": + if task == "tokenizer": + _ = await workflow.execute_activity( + tokenizer_evaluate, + args, + start_to_close_timeout=timedelta(seconds=300), + retry_policy=RetryPolicy(maximum_attempts=2), + ) + else: + raise ApplicationError( + f"Task {task} of framework {framework} is not implemented yet.", + args, + type="BadParams", + non_retryable=True + ) + + else: + raise ApplicationError( + f"{framework} framework not implemented yet", + args, + type="BadParams", + non_retryable=True + ) + + return True \ No newline at end of file diff --git a/apps/python/sidecar/main.py b/apps/python/sidecar/main.py index 09eadf4..e075c4f 100644 --- a/apps/python/sidecar/main.py +++ b/apps/python/sidecar/main.py @@ -5,6 +5,8 @@ from app.config import read_config from fastapi import FastAPI from tokenizers_utils.load import prepare_tokenizer_data +from fastapi.responses import JSONResponse, Response, StreamingResponse + ################################################### # SET UP SIDECAR @@ -35,7 +37,7 @@ @app.get("/pokt/tokenizer") def get_tokenizer(): l.debug("returning tokenizer data") - return TOKENIZER_JSON + return JSONResponse(content=TOKENIZER_JSON) # ----------------------------------------------- @@ -45,3 +47,4 @@ def get_tokenizer(): def get_tokenizer(): l.debug("returning tokenizer hash") return TOKENIZER_HASH + diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index 8d0aeff..e20942b 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -42,8 +42,9 @@ def instance_to_dict(instance: Instance, task_id: ObjectId) -> dict: instance_mongo['_id'] = ObjectId() instance_mongo['done'] = False return instance_mongo + + async def get_tokenizer_hash(self, address: str, service: str) -> str: - async def get_tokenizer_objects(self, address: str, service: str) -> dict: node = await self.client.db[self.nodes_collection].find_one({'address': address, 'service': service}) if node is None: @@ -62,11 +63,21 @@ async def get_tokenizer_objects(self, address: str, service: str) -> dict: if (task['task_data']['framework'] == 'signatures') and (task['task_data']['task'] == 'tokenizer'): tokenizer_hash = task['last_signature'] + return tokenizer_hash + + async def get_tokenizer_entry(self, tokenizer_hash: str): + return await self.client.db[self.tokenizers_collection].find_one({'hash': tokenizer_hash}) + + + async def get_tokenizer_objects(self, address: str, service: str) -> dict: + + tokenizer_hash = await self.get_tokenizer_hash(address, service) + if tokenizer_hash == '': eval_logger.error("Node address does not have a valid tokenizer_hash.", adress=address) raise ApplicationError(f"Node address {address} does not have a valid tokenizer_hash.") - tokenizer_object = await self.client.db[self.tokenizers_collection].find_one({'hash': tokenizer_hash}) + tokenizer_object = await self.get_tokenizer_entry(tokenizer_hash) # Validate that the tokenizer is not empty if tokenizer_object is None: @@ -148,8 +159,8 @@ async def get_task(self, task_id: ObjectId): task.id = task_id return task - - async def reconstruct_instances(self, task_id: ObjectId, ) -> List[Instance]: + + async def retrieve_responses(self, task_id: ObjectId, ) -> List[str]: cursor = self.client.db[self.tasks_collection].aggregate(aggregate_response_tree(task_id)) result = await cursor.to_list(length=None) @@ -161,6 +172,13 @@ async def reconstruct_instances(self, task_id: ObjectId, ) -> List[Instance]: type="TaskNotFound", non_retryable=False, ) + + return result + + + async def reconstruct_instances(self, task_id: ObjectId, ) -> List[Instance]: + + result = await self.retrieve_responses(task_id) valid_fields = {field.name for field in Instance.__dataclass_fields__.values()} instances = [] diff --git a/packages/python/lmeh/utils/tokenizers.py b/packages/python/lmeh/utils/tokenizers.py index 9c094c6..2f1eb97 100644 --- a/packages/python/lmeh/utils/tokenizers.py +++ b/packages/python/lmeh/utils/tokenizers.py @@ -61,9 +61,12 @@ def load_tokenizer(tokenizer_objects: dict, wf_id:str, tokenizer_ephimeral_path: tokenizer_ephimeral_path.mkdir(parents=True, exist_ok=True) for key, value in tokenizer_objects.items(): - with open( - os.path.join(tokenizer_ephimeral_path, key + ".json"), "w" - ) as f: + filename = os.path.join(tokenizer_ephimeral_path, key + ".json") + with open(filename, "w") as f: + print(filename) + eval_logger.debug( + f"Writing '{filename}'" + ) json.dump(value, f) f.close() diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 2d77638..95edb46 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -161,7 +161,11 @@ class Config: ########### # EVALUATOR ########### -class PocketNetworkEvaluationTaskRequest(PocketNetworkTaskRequest): + +# TODO: Sepparate this class into an agnostic input to the evaluation workflow. +# This class is inhering multiple optional parameters that dont play any role in +# non-LMEH or non-LLM tasks. +class PocketNetworkEvaluationTaskRequest(PocketNetworkTaskRequest): framework: Optional[str] = None task_id: Union[str, PyObjectId] tasks: Optional[str] = None @@ -212,4 +216,33 @@ class CompletionResponse(OpenAIBaseModel): created: int = Field(default_factory=lambda: int(time.time())) model: str choices: List[CompletionResponseChoice] - usage: UsageInfo \ No newline at end of file + usage: UsageInfo + + + +########### +# RESPONSES +########### + +class PocketNetworkMongoDBResultBase(BaseModel): + id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") + task_id: ObjectId + num_samples: int + + class Config: + arbitrary_types_allowed = True + +class SignatureSample(BaseModel): + signature: str + id: int + +class PocketNetworkMongoDBResultSignature(PocketNetworkMongoDBResultBase): + signatures: List[SignatureSample] + + +class NumericSample(BaseModel): + score: float + id: int + +class PocketNetworkMongoDBResultNumerical(PocketNetworkMongoDBResultBase): + scores: List[NumericSample] From e42fb972e4203245dc98e0b110edbe3c6fe929ed Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Thu, 13 Jun 2024 12:11:38 -0300 Subject: [PATCH 2/3] working evaluator on signatures --- .../signatures/tokenizer_evaluate.py | 103 ++++++++++-------- apps/python/sidecar/Dockerfile | 1 - apps/python/sidecar/main.py | 17 +-- apps/python/sidecar/tokenizers_utils/load.py | 53 --------- packages/python/lmeh/utils/tokenizers.py | 50 +++++---- packages/python/protocol/protocol.py | 13 +++ 6 files changed, 112 insertions(+), 125 deletions(-) delete mode 100644 apps/python/sidecar/tokenizers_utils/load.py diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py index d0f2fac..6cc1cbb 100644 --- a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -3,7 +3,7 @@ from temporalio.exceptions import ApplicationError from app.app import get_app_logger, get_app_config -from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest, PocketNetworkMongoDBResultSignature, SignatureSample +from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest, PocketNetworkMongoDBResultSignature, SignatureSample, PocketNetworkMongoDBTokenizer from packages.python.lmeh.utils.mongodb import MongoOperator from packages.python.common.auto_heartbeater import auto_heartbeater @@ -28,6 +28,7 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: config = app_config['config'] try: + task_id_str = args.task_id args.task_id = ObjectId(args.task_id) except Exception as e: raise ApplicationError( @@ -57,77 +58,93 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: # Create the result, empty for now result = PocketNetworkMongoDBResultSignature( task_id = args.task_id, - num_samples = 0, + num_samples = 0, signatures=[] ) # Get tokenizer jsons - tokenizer_jsons = json.loads(responses[0]['response']['response']) - eval_logger.debug("Tokenizer found.", tokenizer_keys=list(tokenizer_jsons.keys())) - tokenizer_ok = False - if 'model_max_length' in tokenizer_jsons['tokenizer_config']: - tokenizer_jsons['tokenizer_config']['model_max_length'] = int( - tokenizer_jsons['tokenizer_config']['model_max_length']) + tokenizer_decoded = False try: - # Try to load, if this succeds, the tokenizer is OK - tokenizer = load_tokenizer( - tokenizer_objects=tokenizer_jsons, - wf_id='', - tokenizer_ephimeral_path='/tmp/lala' - ) - eval_logger.debug("Tokenizer loaded.") - # This creates the structure used in the database, containing the hash - tokenizer_mongo_new = prepare_tokenizer(tokenizer) - eval_logger.debug("Tokenizer processed.") - tokenizer_ok = True + tokenizer_jsons = json.loads(responses[0]['response']['response']) + tokenizer_decoded = True except Exception as e: - # This is not an error is just a failure in retrieval of tokenizer - eval_logger.info(f"Cannot load tokenizer from response.") - eval_logger.error(f"Exeption:", Exeption=str(e)) - - asdasdasd - - tokenizer_ok = False + eval_logger.debug(f"Exeption:", Exeption=str(e)) + + tokenizer_ok = False + if tokenizer_decoded: + eval_logger.debug("Tokenizer found.", tokenizer_keys=list(tokenizer_jsons.keys())) + + if 'model_max_length' in tokenizer_jsons['tokenizer_config']: + tokenizer_jsons['tokenizer_config']['model_max_length'] = int( + tokenizer_jsons['tokenizer_config']['model_max_length']) + try: + # Try to load, if this succeds, the tokenizer is OK + temp_path = '/tmp/'+task_id_str + tokenizer = load_tokenizer( + tokenizer_objects=tokenizer_jsons, + wf_id='', + tokenizer_ephimeral_path=temp_path + ) + eval_logger.debug("Tokenizer loaded.") + # This creates the structure used in the database, containing the hash + tokenizer_jsons_loaded, tokenizer_hash_loaded = prepare_tokenizer(tokenizer, TOKENIZER_EPHIMERAL_PATH=temp_path) + tokenizer_mongo_new = PocketNetworkMongoDBTokenizer( + tokenizer=tokenizer_jsons_loaded, + hash=tokenizer_hash_loaded + ) + eval_logger.debug("Tokenizer processed.") + tokenizer_ok = True + except Exception as e: + # This is not an error is just a failure in retrieval of tokenizer + eval_logger.info(f"Cannot load tokenizer from response.") + eval_logger.debug(f"Exeption:", Exeption=str(e)) + tokenizer_ok = False + tokenizer_new = False if tokenizer_ok: # check if the tokenizer exists in db - tokenizer_db = mongo_operator.get_tokenizer_entry(tokenizer_mongo_new['hash']) + tokenizer_db = await mongo_operator.get_tokenizer_entry(tokenizer_mongo_new.hash) if tokenizer_db == None: + eval_logger.debug("Tokenizer does not exists.") # the tokenizer is not tracked, we need to create an entry tokenizer_new = True try: async with mongo_client.start_transaction() as session: await mongo_client.db['tokenizers'].insert_many( - [tokenizer_mongo_new.model_dump(by_alias=True)], + [tokenizer_mongo_new], ordered=False, session=session, ) + eval_logger.debug("Saved new tokenizer to DB.") except Exception as e: eval_logger.error("Failed to save Tokenizer to MongoDB.") eval_logger.error(f"Exeption:", Exeption=str(e)) raise ApplicationError("Failed to save tokenizer to MongoDB.", non_retryable=True) # Update the result with valid data - result.num_samples = 1, # Always one + result.num_samples = 1 # Always one result.signatures=[SignatureSample( - signature = tokenizer_mongo_new['hash'], + signature = str(tokenizer_mongo_new.hash), id = 0 # This task has a single sample id )] - # Save to results db - try: - async with mongo_client.start_transaction() as session: - await mongo_client.db['responses'].insert_many( - [result.model_dump(by_alias=True)], - ordered=False, - session=session, - ) - except Exception as e: - eval_logger.error("Failed to save Result to MongoDB.") - eval_logger.error(f"Exeption:", Exeption=str(e)) - raise ApplicationError("Failed to save result to MongoDB.", non_retryable=True) + # Save to results db (a failure is also an answer) + try: + async with mongo_client.start_transaction() as session: + await mongo_client.db['results'].insert_many( + [result.model_dump(by_alias=True)], + ordered=False, + session=session, + ) + eval_logger.debug("Saved result to DB.") + except Exception as e: + eval_logger.error("Failed to save Result to MongoDB.") + eval_logger.error(f"Exeption:", Exeption=str(e)) + raise ApplicationError("Failed to save result to MongoDB.", non_retryable=True) + + eval_logger.info(f"Status:", tokenizer_decoded=tokenizer_decoded, tokenizer_is_valid=tokenizer_ok, tokenizer_is_new=tokenizer_new) - return {'tokenizer_is_valid': tokenizer_ok, 'tokenizer_is_new' : tokenizer_new} + return True diff --git a/apps/python/sidecar/Dockerfile b/apps/python/sidecar/Dockerfile index fc0c37c..6caf056 100644 --- a/apps/python/sidecar/Dockerfile +++ b/apps/python/sidecar/Dockerfile @@ -17,7 +17,6 @@ RUN pip install --no-cache-dir --upgrade -r /home/app/code/requirements.txt COPY apps/python/sidecar/app /home/app/code/app -COPY apps/python/sidecar/tokenizers_utils /home/app/code/tokenizers_utils COPY apps/python/sidecar/main.py /home/app/code/main.py COPY packages /home/app/code/packages diff --git a/apps/python/sidecar/main.py b/apps/python/sidecar/main.py index e075c4f..d3ff935 100644 --- a/apps/python/sidecar/main.py +++ b/apps/python/sidecar/main.py @@ -1,11 +1,10 @@ -import os -import time - from app.app import get_app_logger, setup_app from app.config import read_config from fastapi import FastAPI -from tokenizers_utils.load import prepare_tokenizer_data -from fastapi.responses import JSONResponse, Response, StreamingResponse +from fastapi.responses import JSONResponse + +from packages.python.lmeh.utils.tokenizers import prepare_tokenizer +from transformers import AutoTokenizer ################################################### @@ -21,7 +20,11 @@ l.info("starting sidecar") # Read tokenizer data -TOKENIZER_JSON, TOKENIZER_HASH = prepare_tokenizer_data(config["tokenizer_path"]) +tokenizer = AutoTokenizer.from_pretrained( + config["tokenizer_path"] + ) +# Process it using the MLTB library (functions are reused by the MLTB) +TOKENIZER_JSON, TOKENIZER_HASH = prepare_tokenizer(tokenizer, TOKENIZER_EPHIMERAL_PATH='/tmp/tokenizer_aux') # Create serving app @@ -46,5 +49,5 @@ def get_tokenizer(): @app.get("/pokt/tokenizer-hash") def get_tokenizer(): l.debug("returning tokenizer hash") - return TOKENIZER_HASH + return JSONResponse({'hash' : TOKENIZER_HASH}) diff --git a/apps/python/sidecar/tokenizers_utils/load.py b/apps/python/sidecar/tokenizers_utils/load.py deleted file mode 100644 index 645fb57..0000000 --- a/apps/python/sidecar/tokenizers_utils/load.py +++ /dev/null @@ -1,53 +0,0 @@ -import json -import os -import shutil -from hashlib import sha256 -from pathlib import Path -from typing import Union - -from transformers import AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast - -def _get_tokenizer_jsons(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast]) -> dict: - """Get tokenizer jsons been used""" - CURRENT_DIR = os.path.dirname(__file__) - EPHIMERAL_FOLDER_NAME = "tmp_tokenizer" - TOKENIZER_EPHIMERAL_PATH = Path(os.path.join(CURRENT_DIR, EPHIMERAL_FOLDER_NAME)) - - # save tokenizer files in ephimeral folder - tokenizer.save_pretrained(TOKENIZER_EPHIMERAL_PATH.absolute()) - tmp_list = [i for i in TOKENIZER_EPHIMERAL_PATH.glob("*.json")] - - # populate tokenizer json - tokenizer_jsons = {} - for json_path in tmp_list: - with open(json_path) as json_file: - filename = json_path.stem - tokenizer_jsons[filename] = json.load(json_file) - try: - shutil.rmtree(TOKENIZER_EPHIMERAL_PATH) - except OSError as e: - raise RuntimeError(f"Error removing '{TOKENIZER_EPHIMERAL_PATH.name}' dir: {e}") from e - - return tokenizer_jsons - - -def prepare_tokenizer(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast]) -> tuple[dict, str]: - - tokenizer_jsons = _get_tokenizer_jsons(tokenizer) - - if "model_max_length" in tokenizer_jsons["tokenizer_config"]: - tokenizer_jsons["tokenizer_config"]["model_max_length"] = str( - tokenizer_jsons["tokenizer_config"]["model_max_length"] - ) - - hash = json.dumps(tokenizer_jsons, sort_keys=True).encode("utf-8") - tokenizer_hash = sha256(hash).hexdigest() - return tokenizer_jsons, tokenizer_hash - - -def prepare_tokenizer_data(tokenizer_path: str) -> tuple[dict, str]: - - # Read tokenizer - tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) - # Convert to json and create hash - return prepare_tokenizer(tokenizer) diff --git a/packages/python/lmeh/utils/tokenizers.py b/packages/python/lmeh/utils/tokenizers.py index 2f1eb97..b91acd9 100644 --- a/packages/python/lmeh/utils/tokenizers.py +++ b/packages/python/lmeh/utils/tokenizers.py @@ -3,21 +3,28 @@ import shutil home = os.environ['HOME'] -from bson.objectid import ObjectId from hashlib import sha256 from pathlib import Path from transformers import AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast -from typing import List, Optional, Union +from typing import Union -from app.app import get_app_logger -eval_logger = get_app_logger("sample") +try: + from app.app import get_app_logger + eval_logger = get_app_logger("sample") +except: + print('No logger available') + eval_logger=None -def _get_tokenizer_jsons(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast])-> dict: +def _get_tokenizer_jsons(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], TOKENIZER_EPHIMERAL_PATH = None)-> dict: """Get tokenizer jsons been used""" CURRENT_DIR = os.path.dirname(__file__) - EPHIMERAL_FOLDER_NAME = "tmp_tokenizer" - TOKENIZER_EPHIMERAL_PATH = Path( - os.path.join(CURRENT_DIR, EPHIMERAL_FOLDER_NAME)) + + if TOKENIZER_EPHIMERAL_PATH == None: + TOKENIZER_EPHIMERAL_PATH = Path( + os.path.join(CURRENT_DIR, "tmp_tokenizer")) + else: + TOKENIZER_EPHIMERAL_PATH = Path(TOKENIZER_EPHIMERAL_PATH) + TOKENIZER_EPHIMERAL_PATH.mkdir(parents=True, exist_ok=True) # save tokenizer files in ephimeral folder tokenizer.save_pretrained(TOKENIZER_EPHIMERAL_PATH.absolute()) @@ -38,9 +45,9 @@ def _get_tokenizer_jsons(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokeniz return tokenizer_jsons -def prepare_tokenizer(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast])-> dict: +def prepare_tokenizer(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], TOKENIZER_EPHIMERAL_PATH = None)-> dict: - tokenizer_jsons = _get_tokenizer_jsons(tokenizer) + tokenizer_jsons = _get_tokenizer_jsons(tokenizer, TOKENIZER_EPHIMERAL_PATH = TOKENIZER_EPHIMERAL_PATH) if 'model_max_length' in tokenizer_jsons['tokenizer_config']: @@ -49,8 +56,7 @@ def prepare_tokenizer(tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerF hash = json.dumps(tokenizer_jsons, sort_keys=True).encode('utf-8') tokenizer_hash = sha256(hash).hexdigest() - tokenizer_mongo = {'tokenizer': tokenizer_jsons, 'hash': tokenizer_hash, '_id': ObjectId()} - return tokenizer_mongo + return tokenizer_jsons, tokenizer_hash def load_tokenizer(tokenizer_objects: dict, wf_id:str, tokenizer_ephimeral_path: str=None)-> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]: @@ -64,9 +70,10 @@ def load_tokenizer(tokenizer_objects: dict, wf_id:str, tokenizer_ephimeral_path: filename = os.path.join(tokenizer_ephimeral_path, key + ".json") with open(filename, "w") as f: print(filename) - eval_logger.debug( - f"Writing '{filename}'" - ) + if eval_logger != None: + eval_logger.debug( + f"Writing '{filename}'" + ) json.dump(value, f) f.close() @@ -75,12 +82,13 @@ def load_tokenizer(tokenizer_objects: dict, wf_id:str, tokenizer_ephimeral_path: ) try: shutil.rmtree(tokenizer_ephimeral_path) - eval_logger.debug( - f"Ephimeral '{tokenizer_ephimeral_path.name}' directory removed successfully." - ) - eval_logger.debug( - f"Tokenizer objects availables: {str(tokenizer_objects.keys())}" - ) + if eval_logger != None: + eval_logger.debug( + f"Ephimeral '{tokenizer_ephimeral_path.name}' directory removed successfully." + ) + eval_logger.debug( + f"Tokenizer objects availables: {str(tokenizer_objects.keys())}" + ) except OSError as e: raise RuntimeError( f"Error removing '{tokenizer_ephimeral_path.name}' directory: {e}" diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 95edb46..3f9e3b9 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -246,3 +246,16 @@ class NumericSample(BaseModel): class PocketNetworkMongoDBResultNumerical(PocketNetworkMongoDBResultBase): scores: List[NumericSample] + + +########### +# Tokenizer +########### + +class PocketNetworkMongoDBTokenizer(BaseModel): + id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") + tokenizer : dict + hash : str + + class Config: + arbitrary_types_allowed = True \ No newline at end of file From f5be6c2c8b06d185ee48eddc92fc084f35f76d09 Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Thu, 13 Jun 2024 12:19:39 -0300 Subject: [PATCH 3/3] evaluator working --- .../signatures/tokenizer_evaluate.py | 116 +++++++++--------- apps/python/sidecar/main.py | 12 +- 2 files changed, 60 insertions(+), 68 deletions(-) diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py index 6cc1cbb..0d7c634 100644 --- a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -1,16 +1,20 @@ +import json +from hashlib import sha256 + +from app.app import get_app_config, get_app_logger from bson import ObjectId from temporalio import activity from temporalio.exceptions import ApplicationError -from app.app import get_app_logger, get_app_config -from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest, PocketNetworkMongoDBResultSignature, SignatureSample, PocketNetworkMongoDBTokenizer -from packages.python.lmeh.utils.mongodb import MongoOperator from packages.python.common.auto_heartbeater import auto_heartbeater - +from packages.python.lmeh.utils.mongodb import MongoOperator from packages.python.lmeh.utils.tokenizers import load_tokenizer, prepare_tokenizer - -import json -from hashlib import sha256 +from packages.python.protocol.protocol import ( + PocketNetworkEvaluationTaskRequest, + PocketNetworkMongoDBResultSignature, + PocketNetworkMongoDBTokenizer, + SignatureSample, +) @activity.defn @@ -22,10 +26,9 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: :return: """ - app_config = get_app_config() eval_logger = get_app_logger("evaluation") - config = app_config['config'] + config = app_config["config"] try: task_id_str = args.task_id @@ -33,7 +36,8 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: except Exception as e: raise ApplicationError( "Bad Task ID format", - str(e), args.task_id, + str(e), + args.task_id, type="BadParams", non_retryable=True, ) @@ -46,51 +50,45 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: # Retrieve all responses responses = await mongo_operator.retrieve_responses(args.task_id) - if len(responses)!=1: + if len(responses) != 1: eval_logger.error(f"Found {len(responses)} responses, only 1 is expected.") raise ApplicationError( - f"Task ID {args.task_id}: Found {len(responses)} responses, only 1 is expected.", - args.task_id, - type="ResponseError", - non_retryable=False, - ) - + f"Task ID {args.task_id}: Found {len(responses)} responses, only 1 is expected.", + args.task_id, + type="ResponseError", + non_retryable=False, + ) + # Create the result, empty for now - result = PocketNetworkMongoDBResultSignature( - task_id = args.task_id, - num_samples = 0, - signatures=[] - ) - + result = PocketNetworkMongoDBResultSignature(task_id=args.task_id, num_samples=0, signatures=[]) + # Get tokenizer jsons tokenizer_decoded = False try: - tokenizer_jsons = json.loads(responses[0]['response']['response']) + tokenizer_jsons = json.loads(responses[0]["response"]["response"]) tokenizer_decoded = True except Exception as e: eval_logger.debug(f"Exeption:", Exeption=str(e)) - + tokenizer_ok = False if tokenizer_decoded: eval_logger.debug("Tokenizer found.", tokenizer_keys=list(tokenizer_jsons.keys())) - - if 'model_max_length' in tokenizer_jsons['tokenizer_config']: - tokenizer_jsons['tokenizer_config']['model_max_length'] = int( - tokenizer_jsons['tokenizer_config']['model_max_length']) + + if "model_max_length" in tokenizer_jsons["tokenizer_config"]: + tokenizer_jsons["tokenizer_config"]["model_max_length"] = int( + tokenizer_jsons["tokenizer_config"]["model_max_length"] + ) try: # Try to load, if this succeds, the tokenizer is OK - temp_path = '/tmp/'+task_id_str - tokenizer = load_tokenizer( - tokenizer_objects=tokenizer_jsons, - wf_id='', - tokenizer_ephimeral_path=temp_path - ) + temp_path = "/tmp/" + task_id_str + tokenizer = load_tokenizer(tokenizer_objects=tokenizer_jsons, wf_id="", tokenizer_ephimeral_path=temp_path) eval_logger.debug("Tokenizer loaded.") # This creates the structure used in the database, containing the hash - tokenizer_jsons_loaded, tokenizer_hash_loaded = prepare_tokenizer(tokenizer, TOKENIZER_EPHIMERAL_PATH=temp_path) + tokenizer_jsons_loaded, tokenizer_hash_loaded = prepare_tokenizer( + tokenizer, TOKENIZER_EPHIMERAL_PATH=temp_path + ) tokenizer_mongo_new = PocketNetworkMongoDBTokenizer( - tokenizer=tokenizer_jsons_loaded, - hash=tokenizer_hash_loaded + tokenizer=tokenizer_jsons_loaded, hash=tokenizer_hash_loaded ) eval_logger.debug("Tokenizer processed.") tokenizer_ok = True @@ -99,8 +97,6 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: eval_logger.info(f"Cannot load tokenizer from response.") eval_logger.debug(f"Exeption:", Exeption=str(e)) tokenizer_ok = False - - tokenizer_new = False if tokenizer_ok: @@ -112,39 +108,39 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: tokenizer_new = True try: async with mongo_client.start_transaction() as session: - await mongo_client.db['tokenizers'].insert_many( - [tokenizer_mongo_new], - ordered=False, - session=session, - ) + await mongo_client.db["tokenizers"].insert_many( + [tokenizer_mongo_new], + ordered=False, + session=session, + ) eval_logger.debug("Saved new tokenizer to DB.") except Exception as e: eval_logger.error("Failed to save Tokenizer to MongoDB.") eval_logger.error(f"Exeption:", Exeption=str(e)) raise ApplicationError("Failed to save tokenizer to MongoDB.", non_retryable=True) - + # Update the result with valid data - result.num_samples = 1 # Always one - result.signatures=[SignatureSample( - signature = str(tokenizer_mongo_new.hash), - id = 0 # This task has a single sample id - )] - + result.num_samples = 1 # Always one + result.signatures = [ + SignatureSample(signature=str(tokenizer_mongo_new.hash), id=0) # This task has a single sample id + ] # Save to results db (a failure is also an answer) try: async with mongo_client.start_transaction() as session: - await mongo_client.db['results'].insert_many( - [result.model_dump(by_alias=True)], - ordered=False, - session=session, - ) + await mongo_client.db["results"].insert_many( + [result.model_dump(by_alias=True)], + ordered=False, + session=session, + ) eval_logger.debug("Saved result to DB.") except Exception as e: eval_logger.error("Failed to save Result to MongoDB.") eval_logger.error(f"Exeption:", Exeption=str(e)) raise ApplicationError("Failed to save result to MongoDB.", non_retryable=True) - - eval_logger.info(f"Status:", tokenizer_decoded=tokenizer_decoded, tokenizer_is_valid=tokenizer_ok, tokenizer_is_new=tokenizer_new) - + + eval_logger.info( + f"Status:", tokenizer_decoded=tokenizer_decoded, tokenizer_is_valid=tokenizer_ok, tokenizer_is_new=tokenizer_new + ) + return True diff --git a/apps/python/sidecar/main.py b/apps/python/sidecar/main.py index d3ff935..a5c881d 100644 --- a/apps/python/sidecar/main.py +++ b/apps/python/sidecar/main.py @@ -2,10 +2,9 @@ from app.config import read_config from fastapi import FastAPI from fastapi.responses import JSONResponse - -from packages.python.lmeh.utils.tokenizers import prepare_tokenizer from transformers import AutoTokenizer +from packages.python.lmeh.utils.tokenizers import prepare_tokenizer ################################################### # SET UP SIDECAR @@ -20,11 +19,9 @@ l.info("starting sidecar") # Read tokenizer data -tokenizer = AutoTokenizer.from_pretrained( - config["tokenizer_path"] - ) +tokenizer = AutoTokenizer.from_pretrained(config["tokenizer_path"]) # Process it using the MLTB library (functions are reused by the MLTB) -TOKENIZER_JSON, TOKENIZER_HASH = prepare_tokenizer(tokenizer, TOKENIZER_EPHIMERAL_PATH='/tmp/tokenizer_aux') +TOKENIZER_JSON, TOKENIZER_HASH = prepare_tokenizer(tokenizer, TOKENIZER_EPHIMERAL_PATH="/tmp/tokenizer_aux") # Create serving app @@ -49,5 +46,4 @@ def get_tokenizer(): @app.get("/pokt/tokenizer-hash") def get_tokenizer(): l.debug("returning tokenizer hash") - return JSONResponse({'hash' : TOKENIZER_HASH}) - + return JSONResponse({"hash": TOKENIZER_HASH})