From 14eb1178210fabf49cd530929cc9b6541d6b4bbd Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Thu, 1 Aug 2024 15:02:44 -0300 Subject: [PATCH 1/5] base workflow code created - needs tests --- apps/go/manager/activities/analyze_node.go | 257 +++++---------- .../manager/activities/process_node_result.go | 296 ++++++++++++++++++ apps/go/manager/records/task.go | 6 +- apps/go/manager/tests/circular_buffer_test.go | 10 +- apps/go/manager/types/activities.go | 14 + apps/go/manager/types/circular_buffer.go | 20 +- apps/go/manager/types/workflows.go | 10 + apps/go/manager/workflows/entry.go | 5 + apps/go/manager/workflows/result_analyzer.go | 48 +++ 9 files changed, 477 insertions(+), 189 deletions(-) create mode 100644 apps/go/manager/activities/process_node_result.go create mode 100644 apps/go/manager/workflows/result_analyzer.go diff --git a/apps/go/manager/activities/analyze_node.go b/apps/go/manager/activities/analyze_node.go index 6eb768a..6b1caa6 100644 --- a/apps/go/manager/activities/analyze_node.go +++ b/apps/go/manager/activities/analyze_node.go @@ -24,7 +24,10 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams // Get logger l := aCtx.App.Logger - l.Debug().Str("address", params.Node.Address).Str("service", params.Node.Service).Msg("Analyzing staked node.") + l.Debug(). + Str("address", params.Node.Address). + Str("service", params.Node.Service). + Msg("Analyzing staked node.") // Retrieve this node entry var thisNodeData records.NodeRecord @@ -65,16 +68,27 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams for _, test := range params.Tests { for _, task := range test.Tasks { - l.Debug().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Checking task requests.") + l.Debug(). + Str("address", thisNodeData.Address). + Str("service", thisNodeData.Service). + Str("framework", test.Framework). + Str("task", task). + Msg("Checking task requests.") // Check task dependencies depStatus, err := records.CheckTaskDependency(&thisNodeData, test.Framework, task, aCtx.App.Config.Frameworks, aCtx.App.Mongodb, l) if err != nil { - l.Error().Msg("Could not check task dependencies.") + l.Error(). + Msg("Could not check task dependencies.") return nil, err } if !depStatus { - l.Info().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Does not meet task dependencies, ignoring for now.") + l.Info(). + Str("address", thisNodeData.Address). + Str("service", thisNodeData.Service). + Str("framework", test.Framework). + Str("task", task). + Msg("Does not meet task dependencies, ignoring for now.") continue } @@ -85,7 +99,12 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams } thisTaskRecord, found := records.GetTaskData(thisNodeData.ID, taskType, test.Framework, task, aCtx.App.Mongodb, l) if found != true { - l.Error().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("not found task entry after check creation (task should be present at this point)") + l.Error(). + Str("address", thisNodeData.Address). + Str("service", thisNodeData.Service). + Str("framework", test.Framework). + Str("task", task). + Msg("not found task entry after check creation (task should be present at this point)") return nil, fmt.Errorf("not found task entry after check creation (task should be present at this point)") } @@ -161,11 +180,12 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams return &result, nil } -// Checks status of all node's tasks, drops old ones, checks for new results and re-computes. -func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) (err error) { - - // Get results collection - resultsCollection := mongoDB.GetCollection(types.ResultsCollection) +// Checks for node's tasks records and drops old ones. +func updateTasksNode(nodeData *records.NodeRecord, + tests []types.TestsData, + frameworkConfigMap map[string]types.FrameworkConfig, + mongoDB mongodb.MongoDb, + l *zerolog.Logger) (err error) { //-------------------------------------------------------------------------- // Check for each task sample date @@ -174,7 +194,12 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram for _, task := range test.Tasks { - l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Updating circular buffer.") + l.Debug(). + Str("address", nodeData.Address). + Str("service", nodeData.Service). + Str("framework", test.Framework). + Str("task", task). + Msg("Updating circular buffer.") //------------------------------------------------------------------ // Get stored data for this task @@ -186,7 +211,12 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram thisTaskRecord, found := records.GetTaskData(nodeData.ID, taskType, test.Framework, task, mongoDB, l) if !found { - l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Not found, creating.") + l.Debug(). + Str("address", nodeData.Address). + Str("service", nodeData.Service). + Str("framework", test.Framework). + Str("task", task). + Msg("Not found, creating.") defaultDate := time.Now() thisTaskRecord = nodeData.AppendTask(nodeData.ID, test.Framework, task, defaultDate, frameworkConfigMap, mongoDB, l) } @@ -195,76 +225,19 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram // Drop old samples (move indices). //------------------------------------------------------------------ - err = thisTaskRecord.CycleIndexes(l) - if err != nil { - return err - } - - //------------------------------------------------------------------ - // Check pending and done tasks in the database - //------------------------------------------------------------------ - _, tasksDone, _, tasksIDs, err := checkTaskDatabase(nodeData.Address, nodeData.Service, test.Framework, task, mongoDB, l) + cycled, err := thisTaskRecord.CycleIndexes(l) if err != nil { - l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Cannot check Tasks Database.") return err } - //------------------------------------------------------------------ - // Read new results from MongoDB Results and add to buffer - //------------------------------------------------------------------ - if tasksDone > 0 { - - // loop over all tasksIDs - for _, taskID := range tasksIDs { - thisTaskResults := thisTaskRecord.GetResultStruct() - found = false - found, err = thisTaskResults.FindAndLoadResults(taskID, - resultsCollection, - l) - if err != nil { - return err - } - if found == true { - - l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Processing found results.") - - // If nothing is wrong with the result calculation - if thisTaskResults.GetStatus() == 0 { - l.Debug().Int("NumSamples", int(thisTaskResults.GetNumSamples())).Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Inserting results into buffers.") - // Add results to current task record - for i := 0; i < int(thisTaskResults.GetNumSamples()); i++ { - thisTaskRecord.InsertSample(time.Now(), thisTaskResults.GetSample(i), l) - } - // Update the last seen fields - thisTaskRecord.UpdateLastHeight(thisTaskResults.GetResultHeight()) - thisTaskRecord.UpdateLastSeen(thisTaskResults.GetResultTime()) - } else { - // TODO: handle status!=0 - l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Status not zero.") - } - - // Delete all MongoDB entries associated with this task ID - errDel := RemoveTaskID(taskID, mongoDB, l) - if errDel != nil { - l.Debug().Str("delete_error", errDel.Error()).Str("task_id", taskID.String()).Msg("Deletion error.") - } - - } - } - - } - - //------------------------------------------------------------------ - // Calculate new metrics for this task - //------------------------------------------------------------------ - thisTaskRecord.ProcessData(l) - //------------------------------------------------------------------ // Update task in DB //------------------------------------------------------------------ - _, err = thisTaskRecord.UpdateTask(nodeData.ID, test.Framework, task, mongoDB, l) - if err != nil { - return err + if cycled || found { + _, err = thisTaskRecord.UpdateTask(nodeData.ID, test.Framework, task, mongoDB, l) + if err != nil { + return err + } } } @@ -274,8 +247,17 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram return err } -// Looks for a framework-task-node in the TaskDB and retreives all the IDs adn tasks status -func checkTaskDatabase(address string, service string, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (tasksInQueue uint32, tasksDone uint32, blackList []int, tasksIDs []primitive.ObjectID, err error) { +// Looks for a framework-task-node in the TaskDB and retrieves all the IDs and tasks status +func checkTaskDatabase(address string, + service string, + framework string, + task string, + mongoDB mongodb.MongoDb, + l *zerolog.Logger) (tasksInQueue uint32, + tasksDone uint32, + blackList []int, + tasksIDs []primitive.ObjectID, + err error) { // define blacklist as length zero blackList = make([]int, 0) @@ -311,7 +293,12 @@ func checkTaskDatabase(address string, service string, framework string, task st tasksIDs = append(tasksIDs, taskReq.Id) // If not already done if !taskReq.Done { - l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Msg("Found pending task.") + l.Debug(). + Str("address", address). + Str("service", service). + Str("framework", framework). + Str("task", task). + Msg("Found pending task.") // Count pending tasksInQueue += uint32(taskReq.Qty) @@ -335,111 +322,27 @@ func checkTaskDatabase(address string, service string, framework string, task st } } } else { - l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Msg("Found done task.") + l.Debug(). + Str("address", address). + Str("service", service). + Str("framework", framework). + Str("task", task). + Msg("Found done task.") tasksDone += 1 } } - l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Int32("tasksDone", int32(tasksDone)).Int32("tasksInQueue", int32(tasksInQueue)).Int("tasksIDsLen", len(tasksIDs)).Msg("Pending tasks analized.") + l.Debug(). + Str("address", address). + Str("service", service). + Str("framework", framework). + Str("task", task). + Int32("tasksDone", int32(tasksDone)). + Int32("tasksInQueue", int32(tasksInQueue)). + Int("tasksIDsLen", len(tasksIDs)). + Msg("Pending tasks analyzed.") return tasksInQueue, tasksDone, blackList, tasksIDs, err } - -// Given a TaskID from MongoDB, deletes all associated entries from the "tasks", "instances", "prompts", "responses" and "results" collections. -func RemoveTaskID(taskID primitive.ObjectID, mongoDB mongodb.MongoDb, l *zerolog.Logger) (err error) { - - //-------------------------------------------------------------------------- - //-------------------------- Instances ------------------------------------- - //-------------------------------------------------------------------------- - instancesCollection := mongoDB.GetCollection(types.InstanceCollection) - // Set filtering for this node-service pair data - task_request_filter := bson.D{{Key: "task_id", Value: taskID}} - // Set mongo context - ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // Now retrieve all node task requests entries - response, err := instancesCollection.DeleteMany(ctxM, task_request_filter) - if err != nil { - l.Warn().Msg("Could not delete instances data from MongoDB.") - return err - } - - l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted instances data from MongoDB") - - //-------------------------------------------------------------------------- - //-------------------------- Prompts --------------------------------------- - //-------------------------------------------------------------------------- - promptsCollection := mongoDB.GetCollection(types.PromptsCollection) - // Set filtering for this node-service pair data - task_request_filter = bson.D{{Key: "task_id", Value: taskID}} - // Set mongo context - ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // Now retrieve all node task requests entries - response, err = promptsCollection.DeleteMany(ctxM, task_request_filter) - if err != nil { - l.Warn().Msg("Could not delete prompts data from MongoDB.") - return err - } - - l.Debug().Int("deleted", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted prompts data from MongoDB") - - //-------------------------------------------------------------------------- - //-------------------------- Responses ------------------------------------- - //-------------------------------------------------------------------------- - responsesCollection := mongoDB.GetCollection(types.ResponsesCollection) - // Set filtering for this node-service pair data - task_request_filter = bson.D{{Key: "task_id", Value: taskID}} - // Set mongo context - ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // Now retrieve all node task requests entries - response, err = responsesCollection.DeleteMany(ctxM, task_request_filter) - if err != nil { - l.Warn().Msg("Could not delete responses data from MongoDB.") - return err - } - - l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted responses data from MongoDB") - - //-------------------------------------------------------------------------- - //-------------------------- Results --------------------------------------- - //-------------------------------------------------------------------------- - resultsCollection := mongoDB.GetCollection(types.ResultsCollection) - // Set filtering for this node-service pair data - task_request_filter = bson.D{{Key: "result_data.task_id", Value: taskID}} - // Set mongo context - ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // Now retrieve all node task requests entries - response, err = resultsCollection.DeleteMany(ctxM, task_request_filter) - if err != nil { - l.Warn().Msg("Could not delete results data from MongoDB.") - return err - } - - l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted results data from MongoDB") - - //-------------------------------------------------------------------------- - //-------------------------- Task ------------------------------------------ - //-------------------------------------------------------------------------- - tasksCollection := mongoDB.GetCollection(types.TaskCollection) - // Set filtering for this node-service pair data - task_request_filter = bson.D{{Key: "_id", Value: taskID}} - // Set mongo context - ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // Now retrieve all node task requests entries - response, err = tasksCollection.DeleteMany(ctxM, task_request_filter) - if err != nil { - l.Warn().Msg("Could not delete task data from MongoDB.") - return err - } - - l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted task data from MongoDB") - - return nil - -} diff --git a/apps/go/manager/activities/process_node_result.go b/apps/go/manager/activities/process_node_result.go new file mode 100644 index 0000000..c063b13 --- /dev/null +++ b/apps/go/manager/activities/process_node_result.go @@ -0,0 +1,296 @@ +package activities + +import ( + "context" + "fmt" + "manager/records" + "manager/types" + "packages/mongodb" + "time" + + "github.com/rs/zerolog" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo/options" + "go.temporal.io/sdk/temporal" +) + +var AnalyzeResultName = "analyze_result" + +func (aCtx *Ctx) AnalyzeResult(ctx context.Context, params types.AnalyzeResultParams) (*types.AnalyzeResultResults, error) { + + var result types.AnalyzeResultResults + result.Success = false + + // Get logger + l := aCtx.App.Logger + l.Debug(). + Str("task_id", params.TaskID.String()). + Msg("Analyzing task.") + + // Get results collection + resultsCollection := aCtx.App.Mongodb.GetCollection(types.ResultsCollection) + + //------------------------------------------------------------------ + // Get Task data + //------------------------------------------------------------------ + taskData, err := retrieveTaskData(params.TaskID, aCtx.App.Mongodb, l) + if err != nil { + return nil, err + } + // Extract data + Node := types.NodeData{ + Address: taskData.RequesterArgs.Address, + Service: taskData.RequesterArgs.Service, + } + + l.Debug(). + Str("task_id", params.TaskID.String()). + Str("address", Node.Address). + Str("service", Node.Service). + Str("framework", taskData.Framework). + Str("task", taskData.Task). + Msg("Analyzing result.") + + //------------------------------------------------------------------ + // Get stored data for this node + //------------------------------------------------------------------ + var nodeData records.NodeRecord + found, err := nodeData.FindAndLoadNode(Node, aCtx.App.Mongodb, l) + if err != nil { + return nil, err + } + + if !found { + err = temporal.NewApplicationErrorWithCause("unable to get node data", "FindAndLoadNode", fmt.Errorf("Node %s not found", Node.Address)) + l.Error(). + Str("address", Node.Address). + Msg("Cannot retrieve node data") + return nil, err + } + + //------------------------------------------------------------------ + // Get stored data for this task + //------------------------------------------------------------------ + taskType, err := records.GetTaskType(taskData.Framework, taskData.Task, aCtx.App.Config.Frameworks, l) + if err != nil { + return nil, err + } + thisTaskRecord, found := records.GetTaskData(nodeData.ID, taskType, taskData.Framework, taskData.Task, aCtx.App.Mongodb, l) + + if !found { + err = temporal.NewApplicationErrorWithCause("unable to get task data", "GetTaskData", fmt.Errorf("Task %s not found", taskData.Task)) + l.Error(). + Str("address", nodeData.Address). + Str("service", nodeData.Service). + Str("framework", taskData.Framework). + Str("task", taskData.Task). + Msg("Requested task was not found.") + return nil, err + } + + thisTaskResults := thisTaskRecord.GetResultStruct() + found = false + found, err = thisTaskResults.FindAndLoadResults(params.TaskID, + resultsCollection, + l) + if err != nil { + return nil, err + } + if !found { + l.Error(). + Str("address", nodeData.Address). + Str("service", nodeData.Service). + Str("framework", taskData.Framework). + Str("task", taskData.Task). + Msg("Requested result was not found.") + } + + l.Debug(). + Str("address", nodeData.Address). + Str("service", nodeData.Service). + Str("framework", taskData.Framework). + Str("task", taskData.Task). + Str("task_id", params.TaskID.String()). + Msg("Processing found results.") + + // If nothing is wrong with the result calculation + if thisTaskResults.GetStatus() == 0 { + l.Debug(). + Int("NumSamples", int(thisTaskResults.GetNumSamples())). + Str("address", nodeData.Address). + Str("service", nodeData.Service). + Str("framework", taskData.Framework). + Str("task", taskData.Task). + Str("task_id", params.TaskID.String()). + Msg("Inserting results into buffers.") + // Add results to current task record + for i := 0; i < int(thisTaskResults.GetNumSamples()); i++ { + thisTaskRecord.InsertSample(time.Now(), thisTaskResults.GetSample(i), l) + } + // Update the last seen fields + thisTaskRecord.UpdateLastHeight(thisTaskResults.GetResultHeight()) + thisTaskRecord.UpdateLastSeen(thisTaskResults.GetResultTime()) + } else { + // TODO: handle status!=0 + l.Debug(). + Str("address", nodeData.Address). + Str("service", nodeData.Service). + Str("framework", taskData.Framework). + Str("task", taskData.Task). + Str("task_id", params.TaskID.String()). + Msg("Status not zero.") + } + + // Delete all MongoDB entries associated with this task ID + errDel := RemoveTaskID(params.TaskID, aCtx.App.Mongodb, l) + if errDel != nil { + l.Debug(). + Str("delete_error", errDel.Error()). + Str("task_id", params.TaskID.String()). + Msg("Deletion error.") + } + + //------------------------------------------------------------------ + // Calculate new metrics for this task + //------------------------------------------------------------------ + thisTaskRecord.ProcessData(l) + + //------------------------------------------------------------------ + // Update task in DB + //------------------------------------------------------------------ + + _, err = thisTaskRecord.UpdateTask(nodeData.ID, taskData.Framework, taskData.Task, aCtx.App.Mongodb, l) + if err != nil { + return nil, err + } + + result.Success = true + + return &result, nil +} + +// Looks for an specific task in the TaskDB and retrieves all data +func retrieveTaskData(taskID primitive.ObjectID, + mongoDB mongodb.MongoDb, + l *zerolog.Logger) (tasksData types.TaskRequestRecord, + err error) { + + // Get tasks collection + tasksCollection := mongoDB.GetCollection(types.TaskCollection) + + // Set filtering for this task + task_request_filter := bson.D{{Key: "_id", Value: taskID}} + opts := options.FindOne() + // Set mongo context + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Now retrieve all node task requests entries + cursor := tasksCollection.FindOne(ctxM, task_request_filter, opts) + var taskReq types.TaskRequestRecord + if err := cursor.Decode(&taskReq); err != nil { + l.Error().Msg("Could not decode task request data from MongoDB.") + return taskReq, err + } + + return taskReq, nil + +} + +// Given a TaskID from MongoDB, deletes all associated entries from the "tasks", "instances", "prompts", "responses" and "results" collections. +func RemoveTaskID(taskID primitive.ObjectID, mongoDB mongodb.MongoDb, l *zerolog.Logger) (err error) { + + //-------------------------------------------------------------------------- + //-------------------------- Instances ------------------------------------- + //-------------------------------------------------------------------------- + instancesCollection := mongoDB.GetCollection(types.InstanceCollection) + // Set filtering for this node-service pair data + task_request_filter := bson.D{{Key: "task_id", Value: taskID}} + // Set mongo context + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err := instancesCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete instances data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted instances data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Prompts --------------------------------------- + //-------------------------------------------------------------------------- + promptsCollection := mongoDB.GetCollection(types.PromptsCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "task_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = promptsCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete prompts data from MongoDB.") + return err + } + + l.Debug().Int("deleted", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted prompts data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Responses ------------------------------------- + //-------------------------------------------------------------------------- + responsesCollection := mongoDB.GetCollection(types.ResponsesCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "task_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = responsesCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete responses data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted responses data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Results --------------------------------------- + //-------------------------------------------------------------------------- + resultsCollection := mongoDB.GetCollection(types.ResultsCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "result_data.task_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = resultsCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete results data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted results data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Task ------------------------------------------ + //-------------------------------------------------------------------------- + tasksCollection := mongoDB.GetCollection(types.TaskCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = tasksCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete task data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted task data from MongoDB") + + return nil + +} diff --git a/apps/go/manager/records/task.go b/apps/go/manager/records/task.go index 7178f94..293062f 100644 --- a/apps/go/manager/records/task.go +++ b/apps/go/manager/records/task.go @@ -72,7 +72,7 @@ const TaskTTLDays uint32 = 32 type TaskInterface interface { ProcessData(l *zerolog.Logger) error StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error - CycleIndexes(l *zerolog.Logger) error + CycleIndexes(l *zerolog.Logger) (bool, error) InsertSample(timeSample time.Time, data interface{}, l *zerolog.Logger) (err error) GetNumSamples() uint32 GetFramework() string @@ -550,7 +550,7 @@ func (record *NumericalTaskRecord) StepIndex(step uint32, marker string, positiv } // Updates the indexes making them point to the initial and final samples in a given time window. -func (record *NumericalTaskRecord) CycleIndexes(l *zerolog.Logger) error { +func (record *NumericalTaskRecord) CycleIndexes(l *zerolog.Logger) (bool, error) { return record.CircBuffer.CycleIndexes(NumericalSampleTTLDays, l) } func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data interface{}, l *zerolog.Logger) (err error) { @@ -738,7 +738,7 @@ func (record *SignatureTaskRecord) StepIndex(step uint32, marker string, positiv } // Updates the indexes making them point to the initial and final samples in a given time window. -func (record *SignatureTaskRecord) CycleIndexes(l *zerolog.Logger) error { +func (record *SignatureTaskRecord) CycleIndexes(l *zerolog.Logger) (bool, error) { return record.CircBuffer.CycleIndexes(NumericalSampleTTLDays, l) } diff --git a/apps/go/manager/tests/circular_buffer_test.go b/apps/go/manager/tests/circular_buffer_test.go index 7b310a9..c41a54c 100644 --- a/apps/go/manager/tests/circular_buffer_test.go +++ b/apps/go/manager/tests/circular_buffer_test.go @@ -259,11 +259,16 @@ func (s *CircularBuffertUnitTestSuite) Test_CircularBuffer_Cycling() { // Add time testCircularBuffer.Times[testCircularBuffer.Indexes.End] = time.Now() } + // Cycle indexes (nothing should happen) + cycled, err := testCircularBuffer.CycleIndexes(5, s.app.Logger) + if cycled { + s.T().Error(fmt.Errorf("Index cycling signaling sample drop when all samples are up-to-date")) + } // Change date of start sample to an old one validIdx, err := testCircularBuffer.GetBufferValidIndexes(s.app.Logger) testCircularBuffer.Times[validIdx[0]] = types.EpochStart // Cycle indexes - err = testCircularBuffer.CycleIndexes(5, s.app.Logger) + cycled, err = testCircularBuffer.CycleIndexes(5, s.app.Logger) if err != nil { s.T().Error(err) return @@ -272,6 +277,9 @@ func (s *CircularBuffertUnitTestSuite) Test_CircularBuffer_Cycling() { if uint32(stepsMove-1) != testCircularBuffer.NumSamples { s.T().Error(fmt.Errorf("Index cycling not dropping old sample: got = %v, want %v (Start Idx: %v - End Idx : %v)", testCircularBuffer.NumSamples, stepsMove-1, testCircularBuffer.Indexes.Start, testCircularBuffer.Indexes.End)) } + if !cycled { + s.T().Error(fmt.Errorf("Index cycling not signaling old sample drop")) + } // Check number of valid samples validIdx, err = testCircularBuffer.GetBufferValidIndexes(s.app.Logger) if err != nil { diff --git a/apps/go/manager/types/activities.go b/apps/go/manager/types/activities.go index de03826..8c99207 100644 --- a/apps/go/manager/types/activities.go +++ b/apps/go/manager/types/activities.go @@ -1,5 +1,7 @@ package types +import "go.mongodb.org/mongo-driver/bson/primitive" + //------------------------------------------------------------------------------ // Get Staked Nodes //------------------------------------------------------------------------------ @@ -59,3 +61,15 @@ type TriggerSamplerParams struct { type TriggerSamplerResults struct { Success bool } + +//------------------------------------------------------------------------------ +// Analyze Results +//------------------------------------------------------------------------------ + +type AnalyzeResultParams struct { + TaskID primitive.ObjectID `json:"task_id"` +} + +type AnalyzeResultResults struct { + Success bool `json:"success"` +} diff --git a/apps/go/manager/types/circular_buffer.go b/apps/go/manager/types/circular_buffer.go index bbc9f70..8b53912 100644 --- a/apps/go/manager/types/circular_buffer.go +++ b/apps/go/manager/types/circular_buffer.go @@ -29,10 +29,10 @@ type CircularBuffer struct { func (buffer *CircularBuffer) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { l.Debug(). - Int("buffer.Indexes.Start", int(buffer.Indexes.Start)). - Int("buffer.Indexes.End", int(buffer.Indexes.End)). - Int("step", int(step)). - Msg("Circular indexes moving.") + Int("buffer.Indexes.Start", int(buffer.Indexes.Start)). + Int("buffer.Indexes.End", int(buffer.Indexes.End)). + Int("step", int(step)). + Msg("Circular indexes moving.") if step > 1 { return fmt.Errorf("Steps of length larger than 1 are not supported.") @@ -120,24 +120,28 @@ func (buffer *CircularBuffer) StepIndex(step uint32, marker string, positive_ste } } else { buffer.NumSamples = buffer.CircBufferLen - (buffer.Indexes.Start - buffer.Indexes.End) + 1 - + } return nil } -func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logger) error { +func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logger) (bool, error) { // Maximum age of a sample maxAge := time.Duration(sampleTTLDays) * 24 * time.Hour // Check the date of the index start oldestAge := time.Since(buffer.Times[buffer.Indexes.Start]) + if oldestAge < maxAge { + return false, nil + } + for oldestAge >= maxAge { // Increment the start err := buffer.StepIndex(1, "start", true, l) if err != nil { - return err + return true, err } // Update the date oldestAge = time.Since(buffer.Times[buffer.Indexes.Start]) @@ -148,7 +152,7 @@ func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logg } } - return nil + return true, nil } func (buffer *CircularBuffer) BufferLimitCheck(nextVal uint32, l *zerolog.Logger) (uint32, error) { diff --git a/apps/go/manager/types/workflows.go b/apps/go/manager/types/workflows.go index de7c1dd..fbec134 100644 --- a/apps/go/manager/types/workflows.go +++ b/apps/go/manager/types/workflows.go @@ -1,5 +1,7 @@ package types +import "go.mongodb.org/mongo-driver/bson/primitive" + type TestsData struct { Framework string `json:"framework"` Tasks []string `json:"tasks"` @@ -30,3 +32,11 @@ type SamplerWorkflowParams struct { Blacklist []int `json:"blacklist"` Qty int `json:"qty"` } + +type ResultAnalyzerParams struct { + TaskID primitive.ObjectID `json:"task_id"` +} + +type ResultAnalyzerResults struct { + Success bool `json:"success"` +} diff --git a/apps/go/manager/workflows/entry.go b/apps/go/manager/workflows/entry.go index a026218..30c2473 100644 --- a/apps/go/manager/workflows/entry.go +++ b/apps/go/manager/workflows/entry.go @@ -38,4 +38,9 @@ func (wCtx *Ctx) Register(w worker.Worker) { w.RegisterWorkflowWithOptions(wCtx.NodeManager, workflow.RegisterOptions{ Name: NodeManagerName, }) + + // Secondary workflow used to update the results quickly + w.RegisterWorkflowWithOptions(wCtx.ResultAnalyzer, workflow.RegisterOptions{ + Name: ResultAnalyzerName, + }) } diff --git a/apps/go/manager/workflows/result_analyzer.go b/apps/go/manager/workflows/result_analyzer.go new file mode 100644 index 0000000..f5b46dd --- /dev/null +++ b/apps/go/manager/workflows/result_analyzer.go @@ -0,0 +1,48 @@ +package workflows + +import ( + "time" + + "manager/activities" + "manager/types" + + "go.temporal.io/sdk/workflow" +) + +var ResultAnalyzerName = "Manager-ResultAnalyzer" + +// NodeManager - Is a method that orchestrates the tracking of staked ML nodes. +// It performs the following activities: +// - Staked nodes retrieval +// - Analyze nodes data +// - Triggering new evaluation tasks +func (wCtx *Ctx) ResultAnalyzer(ctx workflow.Context, params types.ResultAnalyzerParams) (*types.ResultAnalyzerResults, error) { + + l := wCtx.App.Logger + l.Debug().Msg("Starting Result Analyzer Workflow.") + + // Create result + result := types.ResultAnalyzerResults{Success: false} + + // ------------------------------------------------------------------------- + // -------------------- Analyze Result ------------------------------------- + // ------------------------------------------------------------------------- + // Set timeout to get staked nodes activity + ctxTimeout := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute * 5, + StartToCloseTimeout: time.Minute * 5, + }) + // Set activity input + getAnalyzeResultInput := types.AnalyzeResultParams{ + TaskID: params.TaskID, + } + // Results will be kept logged by temporal + var resultAnalysisData types.AnalyzeResultResults + // Execute activity + err := workflow.ExecuteActivity(ctxTimeout, activities.AnalyzeResultName, getAnalyzeResultInput).Get(ctx, &resultAnalysisData) + if err != nil { + return &result, err + } + + return &result, nil +} From 953713d05ccf9f2614ea07f50bf8570f3920dc64 Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Thu, 1 Aug 2024 16:02:10 -0300 Subject: [PATCH 2/5] Added trigger in python evaluator --- .../activities/trigger_manager_analysis.py | 30 +++++++++++++++++++ apps/python/evaluator/config.sample.json | 6 +++- apps/python/evaluator/workflows/evaluator.py | 14 +++++++-- docker-compose/dev/apps/config/sampler.json | 6 +++- .../morse-poc/apps_configs/sampler.json | 6 +++- 5 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 apps/python/evaluator/activities/trigger_manager_analysis.py diff --git a/apps/python/evaluator/activities/trigger_manager_analysis.py b/apps/python/evaluator/activities/trigger_manager_analysis.py new file mode 100644 index 0000000..a23d28f --- /dev/null +++ b/apps/python/evaluator/activities/trigger_manager_analysis.py @@ -0,0 +1,30 @@ +from temporalio import activity +from temporalio import workflow +from temporalio.exceptions import ApplicationError +from packages.python.common.auto_heartbeater import auto_heartbeater +from app.app import get_app_logger, get_app_config +from packages.python.lmeh.utils.mongodb import MongoOperator + +# Custom modules +from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest +from bson import ObjectId + + +@activity.defn +@auto_heartbeater +async def trigger_manager_analysis(args: PocketNetworkEvaluationTaskRequest) -> bool: + app_config = get_app_config() + eval_logger = get_app_logger("evaluation") + config = app_config["config"] + + args.task_id + + workflow.start_child_workflow( + config["temporal"]["manager-result-analyzer"]["workflow_name"], + id="result-analysis-%s"%str(args.task_id), + args=[{"task_id": args.task_id}], + task_queue=config["temporal"]["manager-result-analyzer"]["task_queue"] + ) + + + return True diff --git a/apps/python/evaluator/config.sample.json b/apps/python/evaluator/config.sample.json index 17c3ca6..e0f2ae9 100644 --- a/apps/python/evaluator/config.sample.json +++ b/apps/python/evaluator/config.sample.json @@ -7,6 +7,10 @@ "port": 7233, "namespace": "pocket-ml-testbench", "task_queue": "evaluate", - "max_workers": 10 + "max_workers": 10, + "manager-result-analyzer": { + "workflow_name": "Manager-ResultAnalyzer", + "task_queue": "manager-local" + } } } \ No newline at end of file diff --git a/apps/python/evaluator/workflows/evaluator.py b/apps/python/evaluator/workflows/evaluator.py index 022211a..247cb37 100644 --- a/apps/python/evaluator/workflows/evaluator.py +++ b/apps/python/evaluator/workflows/evaluator.py @@ -6,6 +6,7 @@ from app.app import get_app_logger from activities.lmeh.evaluate import lmeh_evaluate from activities.get_task_data import get_task_data +from activities.trigger_manager_analysis import trigger_manager_analysis from activities.signatures.tokenizer_evaluate import tokenizer_evaluate @@ -13,7 +14,7 @@ class Evaluator: @workflow.run async def run(self, args: PocketNetworkEvaluationTaskRequest) -> bool: - eval_logger = get_app_logger("Register") + eval_logger = get_app_logger("Evaluator") eval_logger.info("Starting Workflow Evaluator") # Extract framework and task to evaluate framework, task = await workflow.execute_activity( @@ -23,6 +24,7 @@ async def run(self, args: PocketNetworkEvaluationTaskRequest) -> bool: retry_policy=RetryPolicy(maximum_attempts=2), ) + # Perform the corresponding evaluation if framework == "lmeh": _ = await workflow.execute_activity( lmeh_evaluate, @@ -54,6 +56,14 @@ async def run(self, args: PocketNetworkEvaluationTaskRequest) -> bool: type="BadParams", non_retryable=True, ) + + # Trigger the manager result processing workflow + status = await workflow.execute_activity( + trigger_manager_analysis, + args, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy(maximum_attempts=2), + ) eval_logger.info("Workflow Evaluator done") - return True + return status diff --git a/docker-compose/dev/apps/config/sampler.json b/docker-compose/dev/apps/config/sampler.json index a7e1b05..044f04e 100644 --- a/docker-compose/dev/apps/config/sampler.json +++ b/docker-compose/dev/apps/config/sampler.json @@ -6,7 +6,11 @@ "host": "temporal", "port": 7233, "namespace": "pocket-ml-testbench", - "task_queue": "sampler" + "task_queue": "sampler", + "manager-result-analyzer": { + "workflow_name": "Manager-ResultAnalyzer", + "task_queue": "manager" + } }, "timeouts": { "random": { diff --git a/docker-compose/morse-poc/apps_configs/sampler.json b/docker-compose/morse-poc/apps_configs/sampler.json index e0e0393..a7651ec 100644 --- a/docker-compose/morse-poc/apps_configs/sampler.json +++ b/docker-compose/morse-poc/apps_configs/sampler.json @@ -11,7 +11,11 @@ "max_concurrent_activities": 4, "max_concurrent_workflow_tasks": 4, "max_concurrent_workflow_task_polls": 4, - "max_concurrent_activity_task_polls": 4 + "max_concurrent_activity_task_polls": 4, + "manager-result-analyzer": { + "workflow_name": "Manager-ResultAnalyzer", + "task_queue": "manager" + } }, "timeouts": { "A100": { From e3224fcd339a847423fba378109241734d1d2587 Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Fri, 2 Aug 2024 12:25:39 -0300 Subject: [PATCH 3/5] working | updated API for partial leadearboar results --- apps/go/manager/activities/entry.go | 5 ++ apps/go/manager/tests/main_test.go | 2 + apps/go/manager/types/circular_buffer.go | 10 +-- apps/go/manager/types/node_task_request.go | 2 +- apps/go/manager/workflows/result_analyzer.go | 6 +- apps/python/api/app/leaderboard.py | 77 ++++++++++++++----- .../signatures/tokenizer_evaluate.py | 17 ++-- .../activities/trigger_manager_analysis.py | 30 -------- apps/python/evaluator/workflows/evaluator.py | 28 +++++-- docker-compose/dev/apps/config/evaluator.json | 6 +- .../morse-poc/apps_configs/evaluator.json | 6 +- 11 files changed, 109 insertions(+), 80 deletions(-) delete mode 100644 apps/python/evaluator/activities/trigger_manager_analysis.py diff --git a/apps/go/manager/activities/entry.go b/apps/go/manager/activities/entry.go index 6eed525..f33bef9 100644 --- a/apps/go/manager/activities/entry.go +++ b/apps/go/manager/activities/entry.go @@ -42,4 +42,9 @@ func (aCtx *Ctx) Register(w worker.Worker) { w.RegisterActivityWithOptions(aCtx.TriggerSampler, activity.RegisterOptions{ Name: TriggerSamplerName, }) + + w.RegisterActivityWithOptions(aCtx.AnalyzeResult, activity.RegisterOptions{ + Name: AnalyzeResultName, + }) + } diff --git a/apps/go/manager/tests/main_test.go b/apps/go/manager/tests/main_test.go index 30d2425..1ce78cc 100644 --- a/apps/go/manager/tests/main_test.go +++ b/apps/go/manager/tests/main_test.go @@ -94,6 +94,7 @@ func (s *BaseSuite) BeforeTest(_, _ string) { aCtx.AnalyzeNode, aCtx.GetStaked, aCtx.TriggerSampler, + aCtx.AnalyzeResult, } // register the activities that will need to be mock up here @@ -104,6 +105,7 @@ func (s *BaseSuite) BeforeTest(_, _ string) { // register the workflows s.workflowEnv.RegisterWorkflow(wCtx.NodeManager) + s.workflowEnv.RegisterWorkflow(wCtx.ResultAnalyzer) } func (s *BaseSuite) GetPocketRpcMock() *pocket_rpc.MockRpc { diff --git a/apps/go/manager/types/circular_buffer.go b/apps/go/manager/types/circular_buffer.go index 8b53912..eceddeb 100644 --- a/apps/go/manager/types/circular_buffer.go +++ b/apps/go/manager/types/circular_buffer.go @@ -28,11 +28,11 @@ type CircularBuffer struct { // Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer) func (buffer *CircularBuffer) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { - l.Debug(). - Int("buffer.Indexes.Start", int(buffer.Indexes.Start)). - Int("buffer.Indexes.End", int(buffer.Indexes.End)). - Int("step", int(step)). - Msg("Circular indexes moving.") + // l.Debug(). + // Int("buffer.Indexes.Start", int(buffer.Indexes.Start)). + // Int("buffer.Indexes.End", int(buffer.Indexes.End)). + // Int("step", int(step)). + // Msg("Circular indexes moving.") if step > 1 { return fmt.Errorf("Steps of length larger than 1 are not supported.") diff --git a/apps/go/manager/types/node_task_request.go b/apps/go/manager/types/node_task_request.go index 484c982..840eea5 100644 --- a/apps/go/manager/types/node_task_request.go +++ b/apps/go/manager/types/node_task_request.go @@ -20,7 +20,7 @@ type TaskRequestRecord struct { Id primitive.ObjectID `bson:"_id"` RequesterArgs RequesterArgs `bson:"requester_args"` Framework string `bson:"framework"` - Task string `bson:"task"` + Task string `bson:"tasks"` Blacklist []int `bson:"blacklist"` Qty int `bson:"qty"` TotalInstances int `bson:"total_instances"` diff --git a/apps/go/manager/workflows/result_analyzer.go b/apps/go/manager/workflows/result_analyzer.go index f5b46dd..32a5930 100644 --- a/apps/go/manager/workflows/result_analyzer.go +++ b/apps/go/manager/workflows/result_analyzer.go @@ -11,11 +11,7 @@ import ( var ResultAnalyzerName = "Manager-ResultAnalyzer" -// NodeManager - Is a method that orchestrates the tracking of staked ML nodes. -// It performs the following activities: -// - Staked nodes retrieval -// - Analyze nodes data -// - Triggering new evaluation tasks +// ResultAnalyzer - Is a method that processes the results generated by the evaluator. func (wCtx *Ctx) ResultAnalyzer(ctx workflow.Context, params types.ResultAnalyzerParams) (*types.ResultAnalyzerResults, error) { l := wCtx.App.Logger diff --git a/apps/python/api/app/leaderboard.py b/apps/python/api/app/leaderboard.py index 8c412e9..3588dfc 100644 --- a/apps/python/api/app/leaderboard.py +++ b/apps/python/api/app/leaderboard.py @@ -116,6 +116,7 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: # Prepare entry leaderboard_entry = dict() + leaderboard_entry["status"] = "OK" # Add QoS leaderboard_entry["qos"] = { @@ -128,6 +129,7 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: running_mean_avg = 0 weight_avg = 0 std_err_avg = 0 + incomplete = False for metric in LEADERBOARD_METRICS.keys(): metric_name = LEADERBOARD_METRICS[metric] @@ -136,19 +138,29 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: weight_mmlu = 0 std_err_mmlu = 0 # This requiere more steps yay! + all_ok = True + partial = False for mmlu_metric in LEADERBOARD_MMLU_METRICS: data_row = scores_df.loc[ (scores_df["framework"] == LEADERBOARD_FRAMEWORK) * (scores_df["task"] == mmlu_metric) ] - assert len(data_row) == 1 - if data_row["num"].values[0] > 0: + assert len(data_row) <= 1 + + if len(data_row) == 0: + # Cannot compute MMLU + all_ok = False + break + elif data_row["num"].values[0] > 0: metric_mean = ( data_row["mean"].values[0] * data_row["num"].values[0] ) metric_std_err = data_row["std"].values[0] / np.sqrt( data_row["num"].values[0] ) + if data_row["num"].values[0] <= 50: + # This is a partial metric + partial = True else: metric_mean = 0 metric_std_err = 0 @@ -159,43 +171,65 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: if this_w > 0: std_err_mmlu += (metric_std_err / this_w) ** 2 - if weight_mmlu == 0: - running_mean_mmlu = 0 - else: - running_mean_mmlu = running_mean_mmlu / weight_mmlu - if std_err_mmlu != 0: - std_err_mmlu = np.sqrt(std_err_mmlu) + if all_ok: + if weight_mmlu == 0: + running_mean_mmlu = 0 + else: + running_mean_mmlu = running_mean_mmlu / weight_mmlu + if std_err_mmlu != 0: + std_err_mmlu = np.sqrt(std_err_mmlu) - metric_mean = running_mean_mmlu - metric_std_err = std_err_mmlu - metric_weight = weight_mmlu / len(LEADERBOARD_MMLU_METRICS) + metric_mean = running_mean_mmlu + metric_std_err = std_err_mmlu + metric_weight = weight_mmlu / len(LEADERBOARD_MMLU_METRICS) + else: + # No data + metric_mean = np.nan + metric_std_err = np.nan + metric_weight = np.nan else: data_row = scores_df.loc[ (scores_df["framework"] == LEADERBOARD_FRAMEWORK) * (scores_df["task"] == metric) ] - assert len(data_row) == 1 + assert len(data_row) <= 1 - if data_row["num"].values[0] > 0: + if len(data_row) == 0: + # No data + metric_mean = np.nan + metric_std_err = np.nan + metric_weight = np.nan + + elif data_row["num"].values[0] > 0: metric_mean = data_row["mean"].values[0] metric_std_err = data_row["std"].values[0] / np.sqrt( data_row["num"].values[0] ) metric_weight = data_row["num"].values[0] + if data_row["num"].values[0] <= 50: + partial = True else: metric_mean = 0 metric_std_err = 0 metric_weight = 0 - leaderboard_entry["metrics"][metric_name] = { - "mean": metric_mean, - "stderr": metric_std_err, - } - - running_mean_avg += metric_mean * metric_weight - weight_avg += metric_weight - std_err_avg += metric_std_err**2 + if np.isnan(metric_mean) or metric_weight == 0: + leaderboard_entry["metrics"][metric_name] = { + "mean": -1, + "stderr": -1, + "status": "MISSING", + } + incomplete = True # The average will be incomplete + else: + leaderboard_entry["metrics"][metric_name] = { + "mean": metric_mean, + "stderr": metric_std_err, + "status": "OK" if not partial else "PARTIAL", + } + running_mean_avg += metric_mean * metric_weight + weight_avg += metric_weight + std_err_avg += metric_std_err**2 if weight_avg == 0: running_mean_avg = 0 @@ -207,6 +241,7 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: leaderboard_entry["metrics"]["average"] = { "mean": running_mean_avg, "stderr": std_err_avg, + "status": "OK" if not incomplete else "INCOMPLETE", } # Add Metadata diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py index afc20ee..5d0f9df 100644 --- a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -8,7 +8,12 @@ from packages.python.common.auto_heartbeater import auto_heartbeater from packages.python.lmeh.utils.mongodb import MongoOperator -from packages.python.lmeh.utils.tokenizers import load_tokenizer, prepare_tokenizer, load_config, prepare_config +from packages.python.lmeh.utils.tokenizers import ( + load_tokenizer, + prepare_tokenizer, + load_config, + prepare_config, +) from packages.python.protocol.protocol import ( PocketNetworkEvaluationTaskRequest, PocketNetworkMongoDBResultSignature, @@ -19,8 +24,6 @@ ) - - @activity.defn @auto_heartbeater async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: @@ -113,7 +116,7 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: ### CONFIG ##################### _config = load_config( - config_objects = config_jsons, + config_objects=config_jsons, wf_id="", config_ephimeral_path=temp_path, ) @@ -122,7 +125,7 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: config_jsons_loaded, config_hash_loaded = prepare_config( _config, CONFIG_EPHIMERAL_PATH=temp_path ) - # TODO + # TODO # For instance, the tokenizer hash is used as the config hash # in future versions, this should be changed config_mongo_new = PocketNetworkMongoDBConfig( @@ -172,9 +175,7 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: ###################### ### CONFIG ##################### - config_db = await mongo_operator.get_config_entry( - config_mongo_new.hash - ) + config_db = await mongo_operator.get_config_entry(config_mongo_new.hash) if config_db is None: eval_logger.debug("Config does not exists.") # the config is not tracked, we need to create an entry diff --git a/apps/python/evaluator/activities/trigger_manager_analysis.py b/apps/python/evaluator/activities/trigger_manager_analysis.py deleted file mode 100644 index a23d28f..0000000 --- a/apps/python/evaluator/activities/trigger_manager_analysis.py +++ /dev/null @@ -1,30 +0,0 @@ -from temporalio import activity -from temporalio import workflow -from temporalio.exceptions import ApplicationError -from packages.python.common.auto_heartbeater import auto_heartbeater -from app.app import get_app_logger, get_app_config -from packages.python.lmeh.utils.mongodb import MongoOperator - -# Custom modules -from packages.python.protocol.protocol import PocketNetworkEvaluationTaskRequest -from bson import ObjectId - - -@activity.defn -@auto_heartbeater -async def trigger_manager_analysis(args: PocketNetworkEvaluationTaskRequest) -> bool: - app_config = get_app_config() - eval_logger = get_app_logger("evaluation") - config = app_config["config"] - - args.task_id - - workflow.start_child_workflow( - config["temporal"]["manager-result-analyzer"]["workflow_name"], - id="result-analysis-%s"%str(args.task_id), - args=[{"task_id": args.task_id}], - task_queue=config["temporal"]["manager-result-analyzer"]["task_queue"] - ) - - - return True diff --git a/apps/python/evaluator/workflows/evaluator.py b/apps/python/evaluator/workflows/evaluator.py index 247cb37..90f0d43 100644 --- a/apps/python/evaluator/workflows/evaluator.py +++ b/apps/python/evaluator/workflows/evaluator.py @@ -6,8 +6,11 @@ from app.app import get_app_logger from activities.lmeh.evaluate import lmeh_evaluate from activities.get_task_data import get_task_data -from activities.trigger_manager_analysis import trigger_manager_analysis from activities.signatures.tokenizer_evaluate import tokenizer_evaluate +from temporalio.common import WorkflowIDReusePolicy +from temporalio.workflow import ParentClosePolicy +from app.app import get_app_config +from packages.python.common.utils import get_from_dict @workflow.defn @@ -56,14 +59,23 @@ async def run(self, args: PocketNetworkEvaluationTaskRequest) -> bool: type="BadParams", non_retryable=True, ) - + # Trigger the manager result processing workflow - status = await workflow.execute_activity( - trigger_manager_analysis, - args, - start_to_close_timeout=timedelta(seconds=10), - retry_policy=RetryPolicy(maximum_attempts=2), + app_config = get_app_config() + config = app_config["config"] + await workflow.start_child_workflow( + workflow=get_from_dict( + config, "temporal.manager-result-analyzer.workflow_name" + ), + id="result-analysis-%s" % str(args.task_id), + args=[{"task_id": args.task_id}], + task_queue=get_from_dict( + config, "temporal.manager-result-analyzer.task_queue" + ), + id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, + retry_policy=RetryPolicy(maximum_attempts=1), + parent_close_policy=ParentClosePolicy.ABANDON, ) eval_logger.info("Workflow Evaluator done") - return status + return True diff --git a/docker-compose/dev/apps/config/evaluator.json b/docker-compose/dev/apps/config/evaluator.json index ba94126..4d30790 100644 --- a/docker-compose/dev/apps/config/evaluator.json +++ b/docker-compose/dev/apps/config/evaluator.json @@ -6,6 +6,10 @@ "host": "temporal", "port": 7233, "namespace": "pocket-ml-testbench", - "task_queue": "evaluate" + "task_queue": "evaluate", + "manager-result-analyzer": { + "workflow_name": "Manager-ResultAnalyzer", + "task_queue": "manager" + } } } \ No newline at end of file diff --git a/docker-compose/morse-poc/apps_configs/evaluator.json b/docker-compose/morse-poc/apps_configs/evaluator.json index 71cbf7d..c216c76 100644 --- a/docker-compose/morse-poc/apps_configs/evaluator.json +++ b/docker-compose/morse-poc/apps_configs/evaluator.json @@ -11,6 +11,10 @@ "max_concurrent_activities": 4, "max_concurrent_workflow_tasks": 4, "max_concurrent_workflow_task_polls": 4, - "max_concurrent_activity_task_polls": 4 + "max_concurrent_activity_task_polls": 4, + "manager-result-analyzer": { + "workflow_name": "Manager-ResultAnalyzer", + "task_queue": "manager" + } } } \ No newline at end of file From 7e87ab03ede000eb42383e0ebe3b179358bc063d Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Fri, 2 Aug 2024 12:32:07 -0300 Subject: [PATCH 4/5] updated .env --- docker-compose/dev/dependencies/.env | 2 +- docker-compose/morse-poc/.env.sample | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose/dev/dependencies/.env b/docker-compose/dev/dependencies/.env index 338a43b..ee7752b 100644 --- a/docker-compose/dev/dependencies/.env +++ b/docker-compose/dev/dependencies/.env @@ -1,6 +1,6 @@ COMPOSE_PROJECT_NAME=dev TEMPORAL_VERSION=1.23.1.0 -TEMPORAL_TOOLS_VERSION=1.23 +TEMPORAL_TOOLS_VERSION=1.23.1.0 TEMPORAL_UI_VERSION=2.27.1 TEMPORAL_NAMESPACE=pocket-ml-testbench POSTGRESQL_VERSION=16.3-alpine diff --git a/docker-compose/morse-poc/.env.sample b/docker-compose/morse-poc/.env.sample index 6650981..a6071d2 100644 --- a/docker-compose/morse-poc/.env.sample +++ b/docker-compose/morse-poc/.env.sample @@ -16,7 +16,7 @@ SERVED_MODEL_NAME=pocket_network # DEPENDENCIES COMPOSE_PROJECT_NAME=dev TEMPORAL_VERSION=1.23.1.0 -TEMPORAL_TOOLS_VERSION=1.23 +TEMPORAL_TOOLS_VERSION=1.23.1.0 TEMPORAL_UI_VERSION=2.27.1 TEMPORAL_NAMESPACE=pocket-ml-testbench POSTGRESQL_VERSION=16.3-alpine From e737d665ec1c336d89ab0771c8e4b6233bfa6220 Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Tue, 20 Aug 2024 12:23:46 -0300 Subject: [PATCH 5/5] Added develop config to manager | added flag for optional erasure of task's db entries --- apps/go/manager/activities/process_node_result.go | 14 ++++++++------ apps/go/manager/config.sample.json | 3 +++ apps/go/manager/types/config.go | 5 +++++ docker-compose/dev/apps/config/manager.json | 3 +++ docker-compose/morse-poc/apps_configs/manager.json | 3 +++ 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/apps/go/manager/activities/process_node_result.go b/apps/go/manager/activities/process_node_result.go index c063b13..6f50682 100644 --- a/apps/go/manager/activities/process_node_result.go +++ b/apps/go/manager/activities/process_node_result.go @@ -143,12 +143,14 @@ func (aCtx *Ctx) AnalyzeResult(ctx context.Context, params types.AnalyzeResultPa } // Delete all MongoDB entries associated with this task ID - errDel := RemoveTaskID(params.TaskID, aCtx.App.Mongodb, l) - if errDel != nil { - l.Debug(). - Str("delete_error", errDel.Error()). - Str("task_id", params.TaskID.String()). - Msg("Deletion error.") + if !aCtx.App.Config.DevelopCfg.DoNotRemoveTasksFromDB { + errDel := RemoveTaskID(params.TaskID, aCtx.App.Mongodb, l) + if errDel != nil { + l.Debug(). + Str("delete_error", errDel.Error()). + Str("task_id", params.TaskID.String()). + Msg("Deletion error.") + } } //------------------------------------------------------------------ diff --git a/apps/go/manager/config.sample.json b/apps/go/manager/config.sample.json index 8e03956..bc581ee 100644 --- a/apps/go/manager/config.sample.json +++ b/apps/go/manager/config.sample.json @@ -15,6 +15,9 @@ "req_per_sec": 10 }, "log_level": "debug", + "develop": { + "do_not_remove_tasks_from_db" : false + }, "temporal": { "host": "localhost", "port": 7233, diff --git a/apps/go/manager/types/config.go b/apps/go/manager/types/config.go index 8090b59..944a60f 100644 --- a/apps/go/manager/types/config.go +++ b/apps/go/manager/types/config.go @@ -232,6 +232,7 @@ type Config struct { Rpc *RPCConfig `json:"rpc"` LogLevel string `json:"log_level"` Temporal *TemporalConfig `json:"temporal"` + DevelopCfg *DevelopConfig `json:"develop"` } type FrameworkConfig struct { @@ -240,3 +241,7 @@ type FrameworkConfig struct { ScheduleLimits map[string]string `json:"schedule_limits"` TriggerMinimum map[string]string `json:"trigger_minimum"` } + +type DevelopConfig struct { + DoNotRemoveTasksFromDB bool `json:"do_not_remove_tasks_from_db"` +} diff --git a/docker-compose/dev/apps/config/manager.json b/docker-compose/dev/apps/config/manager.json index 24e1a0b..3ee8630 100644 --- a/docker-compose/dev/apps/config/manager.json +++ b/docker-compose/dev/apps/config/manager.json @@ -10,6 +10,9 @@ "req_per_sec": 10 }, "log_level": "info", + "develop": { + "do_not_remove_tasks_from_db" : false + }, "temporal": { "host": "temporal", "port": 7233, diff --git a/docker-compose/morse-poc/apps_configs/manager.json b/docker-compose/morse-poc/apps_configs/manager.json index 02e4366..620deb1 100644 --- a/docker-compose/morse-poc/apps_configs/manager.json +++ b/docker-compose/morse-poc/apps_configs/manager.json @@ -10,6 +10,9 @@ "req_per_sec": 10 }, "log_level": "info", + "develop": { + "do_not_remove_tasks_from_db" : false + }, "temporal": { "host": "temporal", "port": 7233,