Skip to content

Commit

Permalink
42 manager create logic for tokenizer signature task (#62)
Browse files Browse the repository at this point in the history
* initial dependy check for tasks

* added small fix

* dependency check working + sampler fix
  • Loading branch information
RawthiL authored Jun 12, 2024
1 parent 619c907 commit bec07a5
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 44 deletions.
35 changes: 15 additions & 20 deletions apps/go/manager/activities/analyze_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,19 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams
for _, task := range test.Tasks {
l.Debug().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Checking task requests.")

// Check task dependencies
depStatus, err := records.CheckTaskDependency(&thisNodeData, test.Framework, task, aCtx.App.Config.Frameworks, l)
if err != nil {
l.Error().Msg("Could not check task dependencies.")
return nil, err
}
if !depStatus {
l.Info().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Does not meet task dependencies, ignoring for now.")
continue
}

// Get task record
thisTaskRecord, found := getTaskData(&thisNodeData, test.Framework, task, l)
thisTaskRecord, found := records.GetTaskData(&thisNodeData, test.Framework, task, l)
if found != true {
l.Error().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("not found task entry after check creation (task should be present at this point)")
return nil, fmt.Errorf("not found task entry after check creation (task should be present at this point)")
Expand Down Expand Up @@ -168,23 +179,7 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams
return &result, nil
}

// Get specific task data from a node record
func getTaskData(nodeData *records.NodeRecord, framework string, task string, l *zerolog.Logger) (records.TaskInterface, bool) {

// Get all tasks as a single array
combinedTasks := nodeData.CombineTasks()
// Look for entry
for _, taskEntry := range combinedTasks {
// Check if the Name field matches the search string
if taskEntry.GetFramework() == framework && taskEntry.GetTask() == task {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("Found!")
return taskEntry, true
}
}

return nil, false
}

// Checks status of all node's tasks, drops old ones, checks for new results and re-computes.
func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, frameworkConfigMap map[string]types.FrameworkConfig, mongo mongodb.MongoDb, l *zerolog.Logger) (err error) {

nodeData.LastSeenHeight += 1
Expand Down Expand Up @@ -213,7 +208,7 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram
//------------------------------------------------------------------
// Get stored data for this task
//------------------------------------------------------------------
thisTaskRecord, found := getTaskData(nodeData, test.Framework, task, l)
thisTaskRecord, found := records.GetTaskData(nodeData, test.Framework, task, l)

if !found {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Not found, creating.")
Expand Down Expand Up @@ -256,7 +251,7 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram
}

//------------------------------------------------------------------
// Calculate new averages
// Calculate new metrics for this task
//------------------------------------------------------------------
thisTaskRecord.ProcessData(l)

Expand Down
4 changes: 0 additions & 4 deletions apps/go/manager/records/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type NodeRecord struct {
LastSeenTime time.Time `bson:"last_seen_time"`
NumericalTasks []NumericalTaskRecord `bson:"numerical_tasks"`
SignatureTasks []SignatureTaskRecord `bson:"signature_tasks"`
Tokenizer string `bson:"tokenizer"` // TODO: Remove this in the future, in favor of a signature task
}

// Creates and array of interfaces that contains all tasks
Expand Down Expand Up @@ -206,9 +205,6 @@ func (record *NodeRecord) AppendTask(framework string, task string, date time.Ti
func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMap map[string]types.FrameworkConfig, l *zerolog.Logger) error {
// Initialize empty record

// TODO: Remove this placeholder
record.Tokenizer = "83332a7f32e4188bb276a18ff78620acfd3c6edbd68002b746bda990ed30d56c"

// Set node data
record.Address = params.Node.Address
record.Service = params.Node.Service
Expand Down
100 changes: 100 additions & 0 deletions apps/go/manager/records/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"manager/types"
"sort"
"strings"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -60,6 +61,24 @@ type TaskInterface interface {
GetSampleTTLDays() uint32
GetResultStruct() ResultInterface
GetLastSeen() time.Time
IsOK() bool
}

// Get specific task data from a node record
func GetTaskData(nodeData *NodeRecord, framework string, task string, l *zerolog.Logger) (TaskInterface, bool) {

// Get all tasks as a single array
combinedTasks := nodeData.CombineTasks()
// Look for entry
for _, taskEntry := range combinedTasks {
// Check if the Name field matches the search string
if taskEntry.GetFramework() == framework && taskEntry.GetTask() == task {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("Found!")
return taskEntry, true
}
}

return nil, false
}

// Depending on the framework-task pair, the type of data that is saved will vary.
Expand Down Expand Up @@ -89,6 +108,67 @@ func GetTaskType(framework string, task string, configMap map[string]types.Frame
return taskType, nil
}

// Analyzes the configuration and returns if it is possible to proceed with this task triggering/analysis
// A task can depend on others (such as having a tokenizer signature), here we check for that
func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, configMap map[string]types.FrameworkConfig, l *zerolog.Logger) (status bool, err error) {
status = false

// Get Framework config
frameworkCfg, ok := configMap[framework]
if !ok {
l.Error().Str("framework", framework).Msg("framework config not found")
err = fmt.Errorf("framework config not found")
return false, err
}

// Get task type
taskDep, ok := frameworkCfg.TasksDependency[task]
if !ok {
// Search for the "any" field
taskDep, ok = frameworkCfg.TasksDependency["any"]
if !ok {
l.Error().Str("framework", framework).Str("task", task).Msg("cannot find default (or specific) value for task type")
err = fmt.Errorf("cannot find default (or specific) value for task type")
return false, err
}
}

// Check dependency
frameworkTaskandStatus := strings.Split(taskDep, ":")
if len(frameworkTaskandStatus) != 3 {
l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency configuration, expected three elements separated by \":\" ")
return false, nil
}
if frameworkTaskandStatus[0] == "none" {
// No dependencies
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("No dependency: Dependecy OK")
return true, nil
}
thisTaskRecord, found := GetTaskData(nodeData, frameworkTaskandStatus[0], frameworkTaskandStatus[1], l)
if !found {
// The task is not even created, we must fail
return false, nil
} else {
// Check the condition
if frameworkTaskandStatus[2] == "present" {
// Task is present, so OK
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("Present: Dependecy OK")
return true, nil
} else if frameworkTaskandStatus[2] == "ok" {
// Check for it having a correct value
if thisTaskRecord.IsOK() {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("OK: Dependecy OK")
return true, nil
}
} else {
l.Error().Str("framework", framework).Str("task", task).Msg("dependency configuration cannot be processed (status type unknown)")
return false, nil
}
}

return false, nil
}

// ------------------------------------------------------------------------------
// NumericalTaskRecord
// ------------------------------------------------------------------------------
Expand Down Expand Up @@ -165,6 +245,16 @@ func (record *NumericalTaskRecord) GetNumSamples() uint32 {
return record.CircBuffer.NumSamples
}

// Returns True if the task is ok, meaning that their values are updated and correct
func (record *NumericalTaskRecord) IsOK() bool {
if record.MeanScore+record.MedianScore+record.StdScore != 0.0 {
// we have some values, so this task is ok
return true
} else {
return false
}
}

// Calculate task statistics
func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) {

Expand Down Expand Up @@ -338,6 +428,16 @@ func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data inter
return nil
}

// Returns True if the task is ok, meaning that their values are updated and correct
func (record *SignatureTaskRecord) IsOK() bool {
if record.LastSignature != "" {
// there is a signature available, so it is OK
return true
} else {
return false
}
}

// Process the buffer data to produce the signature metrics
func (record *SignatureTaskRecord) ProcessData(l *zerolog.Logger) (err error) {
// Just update the last signature
Expand Down
3 changes: 2 additions & 1 deletion apps/go/manager/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,5 +237,6 @@ type Config struct {
// This config structure holds, for a given framework, the specifics of its task
// Initially this will only determine the type of task: Numerical, Signature, etc
type FrameworkConfig struct {
TasksTypes map[string]string `json:"task_types"`
TasksTypes map[string]string `json:"task_types"`
TasksDependency map[string]string `json:"task_dependency"`
}
7 changes: 4 additions & 3 deletions apps/go/manager/types/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ type NodeManagerParams struct {
}

type NodeManagerResults struct {
Success uint `json:"success"`
Failed uint `json:"failed"`
NewNodes uint `json:"new_nodes"`
SuccessNodes uint `json:"success"`
FailedNodes uint `json:"failed"`
NewNodes uint `json:"new_nodes"`
TriggeredTasks uint `json:"triggered_tasks"`
}

type NodeAnalysisChanResponse struct {
Expand Down
9 changes: 6 additions & 3 deletions apps/go/manager/workflows/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (wCtx *Ctx) NodeManager(ctx workflow.Context, params types.NodeManagerParam
l.Debug().Msg("Starting Node Manager Workflow.")

// Create result
result := types.NodeManagerResults{Success: 0}
result := types.NodeManagerResults{SuccessNodes: 0}

// Check parameters
if len(params.Tests) == 0 {
Expand Down Expand Up @@ -119,12 +119,15 @@ func (wCtx *Ctx) NodeManager(ctx workflow.Context, params types.NodeManagerParam
if response.Response.IsNew {
result.NewNodes += 1
}
result.TriggeredTasks += uint(len(response.Response.Triggers))
}

// -------------------------------------------------------------------------
// -------------------- Trigger Sampler ------------------------------------
// -------------------------------------------------------------------------

l.Debug().Str("service", params.Service).Int("TriggersNums", len(allTriggers))

// Define a channel to store TriggerSamplerResults objects
taskTriggerResultsChan := make(chan *types.TriggerSamplerResults, len(allTriggers))
// defer close lookup task results channel
Expand Down Expand Up @@ -164,9 +167,9 @@ func (wCtx *Ctx) NodeManager(ctx workflow.Context, params types.NodeManagerParam
// Keep count
// Update workflow result
if response.Success {
result.Success += 1
result.SuccessNodes += 1
} else {
result.Failed += 1
result.FailedNodes += 1
}
}

Expand Down
27 changes: 17 additions & 10 deletions apps/python/sampler/activities/signatures/signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,27 @@ async def sign_sample(args: PocketNetworkTaskRequest) -> bool:
insert_mongo_prompt.append(prompt_mongo.model_dump(by_alias=True))
logger.debug(f"Prompt:", PocketNetworkMongoDBPrompt=prompt_mongo)
try:
with mongo_client.start_session() as session:
with session.start_transaction():
mongo_client["pocket-ml-testbench"]["tasks"].insert_many(
insert_mongo_tasks, ordered=False, session=session
async with mongo_client.start_transaction() as session:
await mongo_client.db['tasks'].insert_many(
insert_mongo_tasks,
ordered=False,
session=session,
)
await mongo_client.db['instances'].insert_many(
insert_mongo_instances,
ordered=False,
session=session,
)
mongo_client["pocket-ml-testbench"]["instances"].insert_many(
insert_mongo_instances, ordered=False, session=session
await mongo_client.db['prompts'].insert_many(
insert_mongo_prompt,
ordered=False,
session=session,
)
mongo_client["pocket-ml-testbench"]["prompts"].insert_many(
insert_mongo_prompt, ordered=False, session=session
)
logger.debug("Instances saved to MongoDB successfully.")

logger.debug("Instances saved to MongoDB successfully.")
except Exception as e:
logger.error("Failed to save Instances to MongoDB.")
logger.error(f"Exeption:", Exeption=str(e))
raise ApplicationError("Failed to save instances to MongoDB.", non_retryable=True)

return True
14 changes: 14 additions & 0 deletions docker-compose/dev/apps/config/manager.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,19 @@
"workflow_name": "Sampler",
"task_queue": "sampler"
}
},
"frameworks": {
"lmeh" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"}
},
"helm" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"}
},
"signatures" : {
"task_types": {"any" : "signature"},
"task_dependency": {"any" : "none:none:none"}
}
}
}
1 change: 1 addition & 0 deletions docker-compose/morse-poc/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ POCKET_GEO_MESH_VERSION=MESH-RC-0.5.0-RC-0.11.1
MANAGER_CONFIG_FILE=./apps_configs/manager.json
SAMPLER_CONFIG_FILE=./apps_configs/sampler.json
REQUESTER_CONFIG_FILE=./apps_configs/requester.json
EVALUATOR_CONFIG_FILE=./apps_configs/evaluator.json

# SIDECAR
SIDECAR_NGINX_CONFIG_FILE=./dependencies_configs/sidecar/nginx.conf
Expand Down
15 changes: 12 additions & 3 deletions docker-compose/morse-poc/apps_configs/manager.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@
}
},
"frameworks": {
"lmeh" : {"task_types": {"any" : "numerical"}},
"helm" : {"task_types": {"any" : "numerical"}},
"signatures" : {"task_types": {"any" : "signature"}}
"lmeh" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"}
},
"helm" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"}
},
"signatures" : {
"task_types": {"any" : "signature"},
"task_dependency": {"any" : "none:none:none"}
}
}
}

0 comments on commit bec07a5

Please sign in to comment.