Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

58 evaluator add signatures tokenizer evaluation #63

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/go/manager/records/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ type NumericalTaskRecord struct {
}

type ScoresSample struct {
Score float32 `bson:"scores"`
Score float32 `bson:"score"`
ID int `bson:"id"`
}

Expand Down
47 changes: 47 additions & 0 deletions apps/python/evaluator/activities/get_task_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import sys

from temporalio import activity
from temporalio.exceptions import ApplicationError

# add file path to sys.path
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
from packages.python.common.auto_heartbeater import auto_heartbeater

from app.app import get_app_logger, get_app_config

from packages.python.lmeh.utils.mongodb import MongoOperator


# Custom modules
from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest
from bson import ObjectId

@activity.defn
@auto_heartbeater
async def get_task_data(args: PocketNetworkEvaluationTaskRequest) -> tuple[str, str]:

app_config = get_app_config()
eval_logger = get_app_logger("evaluation")
config = app_config['config']

mongo_client = config["mongo_client"]
mongo_operator = MongoOperator(client=mongo_client)

eval_logger.debug(f"Searching for task {args.task_id}.")

try:
args.task_id = ObjectId(args.task_id)
except Exception as e:
raise ApplicationError(
"Bad Task ID format",
str(e), args.task_id,
type="BadParams",
non_retryable=True,
)

task_mongo = await mongo_operator.get_task(args.task_id)

eval_logger.debug(f"Found! Evaluating [{task_mongo.framework}][{task_mongo.tasks}].")

return task_mongo.framework, task_mongo.tasks
2 changes: 1 addition & 1 deletion apps/python/evaluator/activities/lmeh/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@activity.defn
@auto_heartbeater
async def evaluation(args: PocketNetworkEvaluationTaskRequest) -> bool:
async def lmeh_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool:
"""
Returns a dict where each key is a task name with the evaluation result.
:param args:
Expand Down
146 changes: 146 additions & 0 deletions apps/python/evaluator/activities/signatures/tokenizer_evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import json
from hashlib import sha256

from app.app import get_app_config, get_app_logger
from bson import ObjectId
from temporalio import activity
from temporalio.exceptions import ApplicationError

from packages.python.common.auto_heartbeater import auto_heartbeater
from packages.python.lmeh.utils.mongodb import MongoOperator
from packages.python.lmeh.utils.tokenizers import load_tokenizer, prepare_tokenizer
from packages.python.protocol.protocol import (
PocketNetworkEvaluationTaskRequest,
PocketNetworkMongoDBResultSignature,
PocketNetworkMongoDBTokenizer,
SignatureSample,
)


@activity.defn
@auto_heartbeater
async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool:
"""
Returns a dict where each key is a task name with the evaluation result.
:param args:
:return:
"""

app_config = get_app_config()
eval_logger = get_app_logger("evaluation")
config = app_config["config"]

try:
task_id_str = args.task_id
args.task_id = ObjectId(args.task_id)
except Exception as e:
raise ApplicationError(
"Bad Task ID format",
str(e),
args.task_id,
type="BadParams",
non_retryable=True,
)

mongo_client = config["mongo_client"]
mongo_operator = MongoOperator(client=mongo_client)

# Retrieve Task request.
task_data = await mongo_operator.get_task(args.task_id)

# Retrieve all responses
responses = await mongo_operator.retrieve_responses(args.task_id)
if len(responses) != 1:
eval_logger.error(f"Found {len(responses)} responses, only 1 is expected.")
raise ApplicationError(
f"Task ID {args.task_id}: Found {len(responses)} responses, only 1 is expected.",
args.task_id,
type="ResponseError",
non_retryable=False,
)

# Create the result, empty for now
result = PocketNetworkMongoDBResultSignature(task_id=args.task_id, num_samples=0, signatures=[])

# Get tokenizer jsons
tokenizer_decoded = False
try:
tokenizer_jsons = json.loads(responses[0]["response"]["response"])
tokenizer_decoded = True
except Exception as e:
eval_logger.debug(f"Exeption:", Exeption=str(e))

tokenizer_ok = False
if tokenizer_decoded:
eval_logger.debug("Tokenizer found.", tokenizer_keys=list(tokenizer_jsons.keys()))

if "model_max_length" in tokenizer_jsons["tokenizer_config"]:
tokenizer_jsons["tokenizer_config"]["model_max_length"] = int(
tokenizer_jsons["tokenizer_config"]["model_max_length"]
)
try:
# Try to load, if this succeds, the tokenizer is OK
temp_path = "/tmp/" + task_id_str
tokenizer = load_tokenizer(tokenizer_objects=tokenizer_jsons, wf_id="", tokenizer_ephimeral_path=temp_path)
eval_logger.debug("Tokenizer loaded.")
# This creates the structure used in the database, containing the hash
tokenizer_jsons_loaded, tokenizer_hash_loaded = prepare_tokenizer(
tokenizer, TOKENIZER_EPHIMERAL_PATH=temp_path
)
tokenizer_mongo_new = PocketNetworkMongoDBTokenizer(
tokenizer=tokenizer_jsons_loaded, hash=tokenizer_hash_loaded
)
eval_logger.debug("Tokenizer processed.")
tokenizer_ok = True
except Exception as e:
# This is not an error is just a failure in retrieval of tokenizer
eval_logger.info(f"Cannot load tokenizer from response.")
eval_logger.debug(f"Exeption:", Exeption=str(e))
tokenizer_ok = False

tokenizer_new = False
if tokenizer_ok:
# check if the tokenizer exists in db
tokenizer_db = await mongo_operator.get_tokenizer_entry(tokenizer_mongo_new.hash)
if tokenizer_db == None:
eval_logger.debug("Tokenizer does not exists.")
# the tokenizer is not tracked, we need to create an entry
tokenizer_new = True
try:
async with mongo_client.start_transaction() as session:
await mongo_client.db["tokenizers"].insert_many(
[tokenizer_mongo_new],
ordered=False,
session=session,
)
eval_logger.debug("Saved new tokenizer to DB.")
except Exception as e:
eval_logger.error("Failed to save Tokenizer to MongoDB.")
eval_logger.error(f"Exeption:", Exeption=str(e))
raise ApplicationError("Failed to save tokenizer to MongoDB.", non_retryable=True)

# Update the result with valid data
result.num_samples = 1 # Always one
result.signatures = [
SignatureSample(signature=str(tokenizer_mongo_new.hash), id=0) # This task has a single sample id
]

# Save to results db (a failure is also an answer)
try:
async with mongo_client.start_transaction() as session:
await mongo_client.db["results"].insert_many(
[result.model_dump(by_alias=True)],
ordered=False,
session=session,
)
eval_logger.debug("Saved result to DB.")
except Exception as e:
eval_logger.error("Failed to save Result to MongoDB.")
eval_logger.error(f"Exeption:", Exeption=str(e))
raise ApplicationError("Failed to save result to MongoDB.", non_retryable=True)

eval_logger.info(
f"Status:", tokenizer_decoded=tokenizer_decoded, tokenizer_is_valid=tokenizer_ok, tokenizer_is_new=tokenizer_new
)

return True
8 changes: 7 additions & 1 deletion apps/python/evaluator/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from app.app import setup_app, get_app_logger
from app.config import read_config

from activities.lmeh.evaluate import evaluation as lmeh_evaluate
from activities.lmeh.evaluate import lmeh_evaluate
from activities.get_task_data import get_task_data
from activities.signatures.tokenizer_evaluate import tokenizer_evaluate


from workflows.evaluator import Evaluator
import concurrent.futures

Expand Down Expand Up @@ -78,7 +82,9 @@ async def main():
Evaluator,
],
"activities": [
get_task_data,
lmeh_evaluate,
tokenizer_evaluate,
],
"activity_executor": activity_executor,
}
Expand Down
49 changes: 42 additions & 7 deletions apps/python/evaluator/workflows/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
# add this to ensure app config is available on the thread
from app.app import get_app_logger, get_app_config
# add any activity that needs to be used on this workflow
from activities.lmeh.evaluate import evaluation as lmeh_evaluation
from activities.lmeh.evaluate import lmeh_evaluate
from activities.get_task_data import get_task_data
from activities.signatures.tokenizer_evaluate import tokenizer_evaluate
from pydantic import BaseModel
from packages.python.protocol.converter import pydantic_data_converter

Expand All @@ -16,12 +18,45 @@
class Evaluator:
@workflow.run
async def run(self, args: PocketNetworkEvaluationTaskRequest) -> bool:
# if args.framework == "lmeh":
_ = await workflow.execute_activity(
lmeh_evaluation,

# Extract framework and task to evaluate
framework, task = await workflow.execute_activity(
get_task_data,
args,
start_to_close_timeout=timedelta(seconds=300),
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(maximum_attempts=2),
)
return True
# raise ApplicationError(f"{args.framework} framework not implemented yet")

if framework == "lmeh":
_ = await workflow.execute_activity(
lmeh_evaluate,
args,
start_to_close_timeout=timedelta(seconds=300),
retry_policy=RetryPolicy(maximum_attempts=2),
)

elif framework == "signatures":
if task == "tokenizer":
_ = await workflow.execute_activity(
tokenizer_evaluate,
args,
start_to_close_timeout=timedelta(seconds=300),
retry_policy=RetryPolicy(maximum_attempts=2),
)
else:
raise ApplicationError(
f"Task {task} of framework {framework} is not implemented yet.",
args,
type="BadParams",
non_retryable=True
)

else:
raise ApplicationError(
f"{framework} framework not implemented yet",
args,
type="BadParams",
non_retryable=True
)

return True
1 change: 0 additions & 1 deletion apps/python/sidecar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ RUN pip install --no-cache-dir --upgrade -r /home/app/code/requirements.txt


COPY apps/python/sidecar/app /home/app/code/app
COPY apps/python/sidecar/tokenizers_utils /home/app/code/tokenizers_utils
COPY apps/python/sidecar/main.py /home/app/code/main.py
COPY packages /home/app/code/packages

Expand Down
16 changes: 9 additions & 7 deletions apps/python/sidecar/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import time

from app.app import get_app_logger, setup_app
from app.config import read_config
from fastapi import FastAPI
from tokenizers_utils.load import prepare_tokenizer_data
from fastapi.responses import JSONResponse
from transformers import AutoTokenizer

from packages.python.lmeh.utils.tokenizers import prepare_tokenizer

###################################################
# SET UP SIDECAR
Expand All @@ -19,7 +19,9 @@
l.info("starting sidecar")

# Read tokenizer data
TOKENIZER_JSON, TOKENIZER_HASH = prepare_tokenizer_data(config["tokenizer_path"])
tokenizer = AutoTokenizer.from_pretrained(config["tokenizer_path"])
# Process it using the MLTB library (functions are reused by the MLTB)
TOKENIZER_JSON, TOKENIZER_HASH = prepare_tokenizer(tokenizer, TOKENIZER_EPHIMERAL_PATH="/tmp/tokenizer_aux")


# Create serving app
Expand All @@ -35,7 +37,7 @@
@app.get("/pokt/tokenizer")
def get_tokenizer():
l.debug("returning tokenizer data")
return TOKENIZER_JSON
return JSONResponse(content=TOKENIZER_JSON)


# -----------------------------------------------
Expand All @@ -44,4 +46,4 @@ def get_tokenizer():
@app.get("/pokt/tokenizer-hash")
def get_tokenizer():
l.debug("returning tokenizer hash")
return TOKENIZER_HASH
return JSONResponse({"hash": TOKENIZER_HASH})
Loading
Loading