Skip to content

Commit

Permalink
92 manager sampler evaluator config signature support (#99)
Browse files Browse the repository at this point in the history
* working code

* removed config from tokenizer evaluate task

* linted
  • Loading branch information
RawthiL authored Sep 11, 2024
1 parent 40fc5a1 commit 3ff2c08
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 107 deletions.
8 changes: 4 additions & 4 deletions apps/go/manager/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@
"frameworks": {
"lmeh" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"},
"task_dependency": {"any" : ["signatures:tokenizer:ok", "signatures:config:ok"]},
"schedule_limits": {"any" : "none:none"},
"trigger_minimum": {"any" : "0"}
},
"helm" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"},
"task_dependency": {"any" : ["signatures:tokenizer:ok", "signatures:config:ok"]},
"schedule_limits": {"any" : "none:none"},
"trigger_minimum": {"any" : "0"}
},
"signatures" : {
"task_types": {"any" : "signature"},
"task_dependency": {"any" : "none:none:none"},
"task_dependency": {"any" : ["none:none:none"]},
"schedule_limits": {"any" : "1:session"},
"trigger_minimum": {"tokenizer" : "1"}
"trigger_minimum": {"tokenizer" : "1", "config" : "1"}
}
}

Expand Down
79 changes: 43 additions & 36 deletions apps/go/manager/records/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,47 +181,54 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co
}

// Check dependency
frameworkTaskandStatus := strings.Split(taskDep, ":")
if len(frameworkTaskandStatus) != 3 {
l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency configuration, expected three elements separated by \":\" ")
return false, nil
}
if frameworkTaskandStatus[0] == "none" {
// No dependencies
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("No dependency: Dependecy OK")
return true, nil
}
taskType, err := GetTaskType(frameworkTaskandStatus[0], frameworkTaskandStatus[1], configMap, l)
if err != nil {
l.Error().Str("framework", framework).Str("task", task).Str("task type", taskType).Msg("Error getting task type")
return false, err
}
thisTaskRecord, found := GetTaskData(nodeData.ID, taskType, frameworkTaskandStatus[0], frameworkTaskandStatus[1], mongoDB, l)
if !found {
// The task is not even created, we must fail
return false, nil
} else {
// Check the condition
if frameworkTaskandStatus[2] == "present" {
// Task is present, so OK
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("Present: Dependecy OK")
return true, nil
} else if frameworkTaskandStatus[2] == "ok" {
// Check for it having a correct value
if thisTaskRecord.IsOK() {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy OK")
return true, nil
}
depOK := true
for idxDep := 0; idxDep < len(taskDep); idxDep++ {
// get data from entry
frameworkTaskandStatus := strings.Split(taskDep[idxDep], ":")
if len(frameworkTaskandStatus) != 3 {
l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency configuration, expected three elements separated by \":\" ")
depOK = false
break
}
if frameworkTaskandStatus[0] == "none" {
// No dependencies
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("No dependency: Dependecy OK")
continue
}
taskType, err := GetTaskType(frameworkTaskandStatus[0], frameworkTaskandStatus[1], configMap, l)
if err != nil {
l.Error().Str("framework", framework).Str("task", task).Str("task type", taskType).Msg("Error getting task type")
return false, err
}
thisTaskRecord, found := GetTaskData(nodeData.ID, taskType, frameworkTaskandStatus[0], frameworkTaskandStatus[1], mongoDB, l)
if !found {
// The task is not even created, we must fail
depOK = false
break
} else {
l.Error().Str("framework", framework).Str("task", task).Msg("dependency configuration cannot be processed (status type unknown)")
return false, nil
// Check the condition
if frameworkTaskandStatus[2] == "present" {
// Task is present, so OK
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("Present: Dependecy OK")
continue
} else if frameworkTaskandStatus[2] == "ok" {
// Check for it having a correct value
if thisTaskRecord.IsOK() {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy OK")
continue
}
} else {
l.Error().Str("framework", framework).Str("task", task).Msg("dependency configuration cannot be processed (status type unknown)")
depOK = false
break
}
}
}

return false, nil
return depOK, nil
}

// Analyzes the configuration and checks wheter the triggering the task will
// Analyzes the configuration and checks whether the triggering the task will
// break the schedule limits or not (i.e. trigger twice in the same session)
func CheckTaskSchedule(taskData TaskInterface, block types.BlockData, configMap map[string]types.FrameworkConfig, l *zerolog.Logger) (bool, error) {

Expand Down Expand Up @@ -309,7 +316,7 @@ func CheckTaskTriggerMin(taskData TaskInterface, block types.BlockData, configMa
// Search for the "any" field
taskTriggerMin, ok = frameworkCfg.TriggerMinimum["any"]
if !ok {
l.Error().Str("framework", framework).Str("task", task).Msg("cannot find default (or specific) value for task trgger minimum")
l.Error().Str("framework", framework).Str("task", task).Msg("cannot find default (or specific) value for task trigger minimum")
err := fmt.Errorf("cannot find default (or specific) value for task trigger minimum")
return 0, err
}
Expand Down
8 changes: 4 additions & 4 deletions apps/go/manager/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ type Config struct {
}

type FrameworkConfig struct {
TasksTypes map[string]string `json:"task_types"`
TasksDependency map[string]string `json:"task_dependency"`
ScheduleLimits map[string]string `json:"schedule_limits"`
TriggerMinimum map[string]string `json:"trigger_minimum"`
TasksTypes map[string]string `json:"task_types"`
TasksDependency map[string][]string `json:"task_dependency"`
ScheduleLimits map[string]string `json:"schedule_limits"`
TriggerMinimum map[string]string `json:"trigger_minimum"`
}

type DevelopConfig struct {
Expand Down
184 changes: 184 additions & 0 deletions apps/python/evaluator/activities/signatures/model_config_evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import json
from datetime import datetime

from app.app import get_app_config, get_app_logger
from bson import ObjectId
from temporalio import activity
from temporalio.exceptions import ApplicationError

from packages.python.common.auto_heartbeater import auto_heartbeater
from packages.python.lmeh.utils.mongodb import MongoOperator
from packages.python.lmeh.utils.tokenizers import (
load_config,
prepare_config,
)
from packages.python.protocol.protocol import (
PocketNetworkEvaluationTaskRequest,
PocketNetworkMongoDBResultSignature,
PocketNetworkMongoDBConfig,
SignatureSample,
PocketNetworkMongoDBResultBase,
)


@activity.defn
@auto_heartbeater
async def model_config_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool:
"""
Returns a dict where each key is a task name with the evaluation result.
:param args:
:return:
"""
app_config = get_app_config()
eval_logger = get_app_logger("evaluation")
config = app_config["config"]
mongo_client = config["mongo_client"]
mongo_operator = MongoOperator(client=mongo_client)

try:
try:
task_id_str = args.task_id
args.task_id = ObjectId(args.task_id)
except Exception as e:
raise ApplicationError(
"Bad Task ID format",
str(e),
args.task_id,
type="BadParams",
non_retryable=True,
)

# Retrieve all responses
responses = await mongo_operator.retrieve_responses(args.task_id)
if len(responses) != 1:
eval_logger.error(f"Found {len(responses)} responses, only 1 is expected.")
raise ApplicationError(
f"Task ID {args.task_id}: Found {len(responses)} responses, only 1 is expected.",
str(args.task_id),
type="ResponseError",
non_retryable=False,
)

# Create the result, empty for now
result = PocketNetworkMongoDBResultSignature(
result_data=PocketNetworkMongoDBResultBase(
task_id=args.task_id,
status=responses[0]["response"]["error_code"],
num_samples=0,
result_height=responses[0]["response"]["height"],
result_time=datetime.today().isoformat(),
),
signatures=[],
)

# Get config jsons
model_config_decoded = False
try:
model_config_jsons = json.loads(responses[0]["response"]["response"])
model_config_decoded = True
except Exception as e:
eval_logger.debug("Exeption:", Exeption=str(e))

model_config_ok = False
if model_config_decoded:
eval_logger.debug(
"Model config found.", model_config_keys=list(model_config_jsons.keys())
)

try:
# Try to load, if this succeds, the model config is OK
temp_path = "/tmp/" + task_id_str

_config = load_config(
config_objects=model_config_jsons,
wf_id="",
config_ephimeral_path=temp_path,
)
eval_logger.debug("Config loaded.")
# This creates the structure used in the database, containing the hash
config_jsons_loaded, config_hash_loaded = prepare_config(
_config, CONFIG_EPHIMERAL_PATH=temp_path
)
# TODO
# For instance, the tokenizer hash is used as the config hash
# in future versions, this should be changed
model_config_mongo_new = PocketNetworkMongoDBConfig(
config=config_jsons_loaded, hash=config_hash_loaded
)
eval_logger.debug("Config processed.")

model_config_ok = True
except Exception as e:
# This is not an error is just a failure in retrieval of the model config
eval_logger.info("Cannot load the model config from response.")
eval_logger.debug("Exeption:", Exeption=str(e))
model_config_ok = False

model_config_new = False
if model_config_ok:
# check if the model_config exists in db
model_config_db = await mongo_operator.get_config_entry(
model_config_mongo_new.hash
)
if model_config_db is None:
eval_logger.debug("Model config does not exists.")
# the model config is not tracked, we need to create an entry
model_config_new = True
try:
async with mongo_client.start_transaction() as session:
await mongo_client.db["configs"].insert_many(
[model_config_mongo_new.model_dump(by_alias=True)],
ordered=False,
session=session,
)
eval_logger.debug("Saved new config to DB.")
except Exception as e:
eval_logger.error("Failed to save model cofig to MongoDB.")
eval_logger.error("Exeption:", Exeption=str(e))
raise ApplicationError(
"Failed to save model config to MongoDB.", non_retryable=True
)

# Update the result with valid data
result.result_data.num_samples = 1 # Always one
result.result_data.status = 0 # OK
result.signatures = [
SignatureSample(
signature=str(model_config_mongo_new.hash), id=0
) # This task has a single sample id
]

# Save to results db (a failure is also an answer)
try:
async with mongo_client.start_transaction() as session:
await mongo_client.db["results"].find_one_and_update(
{"result_data.task_id": args.task_id},
{"$set": result.model_dump(by_alias=True)},
upsert=True,
session=session,
)
await mongo_client.db["tasks"].update_one(
{"_id": args.task_id},
{"$set": {"evaluated": True}},
session=session,
)
eval_logger.debug("Saved result to DB.")
except Exception as e:
eval_logger.error("Failed to save Result to MongoDB.")
eval_logger.error("Exception:", Exeption=str(e))
raise ApplicationError(
"Failed to save result to MongoDB.", non_retryable=True
)

eval_logger.info(
"Model Config Status:",
model_config_decoded=model_config_decoded,
model_config_is_valid=model_config_ok,
model_config_is_new=model_config_new,
)
except Exception as e:
# TODO: enhance drop task logic
await mongo_operator.mark_task_to_drop(args.task_id)
raise e

return True
Loading

0 comments on commit 3ff2c08

Please sign in to comment.