diff --git a/apps/go/manager/activities/analyze_node.go b/apps/go/manager/activities/analyze_node.go index 2753507..ddac66d 100644 --- a/apps/go/manager/activities/analyze_node.go +++ b/apps/go/manager/activities/analyze_node.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" ) var AnalyzeNodeName = "analyze_node" @@ -25,12 +26,9 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams l := aCtx.App.Logger l.Debug().Str("address", params.Node.Address).Str("service", params.Node.Service).Msg("Analyzing staked node.") - // Get nodes collection - nodesCollection := aCtx.App.Mongodb.GetCollection(types.NodesCollection) - // Retrieve this node entry var thisNodeData records.NodeRecord - found, err := thisNodeData.FindAndLoadNode(params.Node, nodesCollection, l) + found, err := thisNodeData.FindAndLoadNode(params.Node, aCtx.App.Mongodb, l) if err != nil { return nil, err } @@ -42,7 +40,7 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams if !found { // Create entry in MongoDB l.Debug().Bool("found", found).Msg("Creating empty node entry.") - thisNodeData.Init(params, aCtx.App.Config.Frameworks, l) + thisNodeData.Init(params, aCtx.App.Config.Frameworks, aCtx.App.Mongodb, l) result.IsNew = true } else { @@ -53,18 +51,16 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams } } - // Push to DB - thisNodeData.UpdateNode(nodesCollection, l) + + // TODO : Do general update of node entry if needed (for instance to track last values of buffers) + + // Push to DB the node data + thisNodeData.UpdateNode(aCtx.App.Mongodb, l) //-------------------------------------------------------------------------- // Trigger incomplete tasks //-------------------------------------------------------------------------- - // Get tasks collection - tasksCollection := aCtx.App.Mongodb.GetCollection(types.TaskCollection) - // Get tasks instances - instancesCollection := aCtx.App.Mongodb.GetCollection(types.InstanceCollection) - // Loop over all tasks and frameworks for _, test := range params.Tests { @@ -72,7 +68,7 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams l.Debug().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Checking task requests.") // Check task dependencies - depStatus, err := records.CheckTaskDependency(&thisNodeData, test.Framework, task, aCtx.App.Config.Frameworks, l) + depStatus, err := records.CheckTaskDependency(&thisNodeData, test.Framework, task, aCtx.App.Config.Frameworks, aCtx.App.Mongodb, l) if err != nil { l.Error().Msg("Could not check task dependencies.") return nil, err @@ -83,93 +79,79 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams } // Get task record - thisTaskRecord, found := records.GetTaskData(&thisNodeData, test.Framework, task, l) + taskType, err := records.GetTaskType(test.Framework, task, aCtx.App.Config.Frameworks, l) + if err != nil { + return nil, fmt.Errorf("cannot retrieve task type") + } + thisTaskRecord, found := records.GetTaskData(thisNodeData.ID, taskType, test.Framework, task, aCtx.App.Mongodb, l) if found != true { l.Error().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("not found task entry after check creation (task should be present at this point)") return nil, fmt.Errorf("not found task entry after check creation (task should be present at this point)") } - // If the number of samples is less than the minimum, proceed to request more + // Check schedule restrictions + schdStatus, err := records.CheckTaskSchedule(thisTaskRecord, params.Block, aCtx.App.Config.Frameworks, l) + if err != nil { + l.Error().Msg("Could not check task sqchedule.") + return nil, err + } + if !schdStatus { + l.Info().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Does not meet task schedule, ignoring for now.") + continue + } + + // The schedule is OK, now check minimum tasks to trigger + minTrigger, err := records.CheckTaskTriggerMin(thisTaskRecord, params.Block, aCtx.App.Config.Frameworks, l) + if err != nil { + l.Error().Msg("Could not check task minimum trigger value.") + return nil, err + } + + // If the number of samples is less than the minimum or ther is a minimum value to trigger, proceed to request more numberOfSamples := thisTaskRecord.GetNumSamples() - if numberOfSamples <= thisTaskRecord.GetMinSamplesPerTask() { + if numberOfSamples < thisTaskRecord.GetMinSamplesPerTask() || minTrigger > 0 { // Calculate the total number of request needed reqNeeded := thisTaskRecord.GetMinSamplesPerTask() - numberOfSamples // Check if this exceed the max concurrent task and limit - if reqNeeded > thisTaskRecord.GetMaxConcurrentSamplesPerTask() { - reqNeeded = thisTaskRecord.GetMaxConcurrentSamplesPerTask() + maxConcurrentTasks := thisTaskRecord.GetMaxConcurrentSamplesPerTask() + if reqNeeded > maxConcurrentTasks { + reqNeeded = maxConcurrentTasks } - // Set filtering for this node-service pair data - task_request_filter := bson.D{{Key: "requester_args.address", Value: thisNodeData.Address}, - {Key: "requester_args.service", Value: thisNodeData.Service}, - {Key: "framework", Value: test.Framework}, - {Key: "tasks", Value: task}} - - // Set mongo context - ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Now retrieve all node task requests entries - cursor, err := tasksCollection.Find(ctxM, task_request_filter) + // Get number of tasks in queue + inQueue, _, blackList, _, err := CheckTaskDatabase(thisNodeData.Address, thisNodeData.Service, test.Framework, task, aCtx.App.Mongodb, l) if err != nil { - l.Error().Msg("Could not retrieve task request data from MongoDB.") return nil, err } - defer cursor.Close(ctxM) - var inQueue uint32 = 0 - blackList := make([]int, 0) - for cursor.Next(ctxM) { - var taskReq types.TaskRequestRecord - if err := cursor.Decode(&taskReq); err != nil { - l.Error().Msg("Could not decode task request data from MongoDB.") - return nil, err - } - // If not already done - if !taskReq.Done { - l.Debug().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Found pending task.") - // Count pending - inQueue += uint32(taskReq.Qty) - - // Search for associated instances to retrieve ids - instances_filter := bson.D{{"task_id", taskReq.Id}} - cursor2, err := instancesCollection.Find(ctxM, instances_filter) - if err != nil { - l.Error().Msg("Could not retrieve instances data from MongoDB.") - return nil, err - } - defer cursor2.Close(ctxM) - // Get all ids - for cursor2.Next(ctxM) { - var thisInstance types.InstanceRecord - if err := cursor.Decode(&thisInstance); err != nil { - l.Error().Msg("Could not decode task request data from MongoDB.") - return nil, err - } - for _, docId := range thisInstance.DocIds { - blackList = append(blackList, docId) - } - } + + // Only trigger if the tasks in queue are less than the maximum concurrent tasks that we allow + if maxConcurrentTasks > inQueue { + // Remove the number of task in queue + reqNeeded -= inQueue + + // Apply minimum + if reqNeeded < minTrigger { + reqNeeded = minTrigger } - } + if reqNeeded > 0 { - // Remove the number of task in queue - reqNeeded -= inQueue - if reqNeeded > 0 { - - // Add trigger - thisTrigger := types.TaskTrigger{Address: thisNodeData.Address, - Service: thisNodeData.Service, - Framework: test.Framework, - Task: task, - Blacklist: blackList, - Qty: int(reqNeeded)} - result.Triggers = append(result.Triggers, thisTrigger) + // Add trigger + thisTrigger := types.TaskTrigger{Address: thisNodeData.Address, + Service: thisNodeData.Service, + Framework: test.Framework, + Task: task, + Blacklist: blackList, + Qty: int(reqNeeded)} + result.Triggers = append(result.Triggers, thisTrigger) + } } else { l.Info().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Pending requests capped.") } + } else { + l.Info().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Buffer filled and up to date.") } } } @@ -180,21 +162,10 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams } // Checks status of all node's tasks, drops old ones, checks for new results and re-computes. -func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, frameworkConfigMap map[string]types.FrameworkConfig, mongo mongodb.MongoDb, l *zerolog.Logger) (err error) { - - nodeData.LastSeenHeight += 1 +func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) (err error) { // Get results collection - resultsCollection := mongo.GetCollection(types.ResultsCollection) - - //-------------------------------------------------------------------------- - // Drop old tasks that have not been updated in a long time - //-------------------------------------------------------------------------- - - err = nodeData.PruneTasks(l) - if err != nil { - return err - } + resultsCollection := mongoDB.GetCollection(types.ResultsCollection) //-------------------------------------------------------------------------- // Check for each task sample date @@ -208,12 +179,16 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram //------------------------------------------------------------------ // Get stored data for this task //------------------------------------------------------------------ - thisTaskRecord, found := records.GetTaskData(nodeData, test.Framework, task, l) + taskType, err := records.GetTaskType(test.Framework, task, frameworkConfigMap, l) + if err != nil { + return nil + } + thisTaskRecord, found := records.GetTaskData(nodeData.ID, taskType, test.Framework, task, mongoDB, l) if !found { l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Not found, creating.") defaultDate := time.Now() - thisTaskRecord = nodeData.AppendTask(test.Framework, task, defaultDate, frameworkConfigMap, l) + thisTaskRecord = nodeData.AppendTask(nodeData.ID, test.Framework, task, defaultDate, frameworkConfigMap, mongoDB, l) } //------------------------------------------------------------------ @@ -226,28 +201,57 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram } //------------------------------------------------------------------ - // Read new results from MongoDB Results and add to buffer + // Check pending and done tasks in the database //------------------------------------------------------------------ - thisTaskResults := thisTaskRecord.GetResultStruct() - found = false - found, err = thisTaskResults.FindAndLoadResults(nodeData.Address, - nodeData.Service, - test.Framework, - task, - resultsCollection, - l) + _, tasksDone, _, tasksIDs, err := CheckTaskDatabase(nodeData.Address, nodeData.Service, test.Framework, task, mongoDB, l) if err != nil { + l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Cannot check Tasks Database.") return err } - if found == true { - // If nothing is wrong with the result calculation - if thisTaskResults.GetStatus() == 0 { - // Add results to current task record - for i := 0; i < int(thisTaskResults.GetNumSamples()); i++ { - thisTaskRecord.InsertSample(time.Now(), thisTaskResults.GetSample(i)) + + //------------------------------------------------------------------ + // Read new results from MongoDB Results and add to buffer + //------------------------------------------------------------------ + if tasksDone > 0 { + + // loop over all tasksIDs + for _, taskID := range tasksIDs { + thisTaskResults := thisTaskRecord.GetResultStruct() + found = false + found, err = thisTaskResults.FindAndLoadResults(taskID, + resultsCollection, + l) + if err != nil { + return err + } + if found == true { + + l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Processing found results.") + + // If nothing is wrong with the result calculation + if thisTaskResults.GetStatus() == 0 { + l.Debug().Int("NumSamples", int(thisTaskResults.GetNumSamples())).Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Inserting results into buffers.") + // Add results to current task record + for i := 0; i < int(thisTaskResults.GetNumSamples()); i++ { + thisTaskRecord.InsertSample(time.Now(), thisTaskResults.GetSample(i), l) + } + // Update the last seen fields + thisTaskRecord.UpdateLastHeight(thisTaskResults.GetResultHeight()) + thisTaskRecord.UpdateLastSeen(thisTaskResults.GetResultTime()) + } else { + // TODO: handle status!=0 + l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Status not zero.") + } + + // Delete all MongoDB entries associated with this task ID + errDel := RemoveTaskID(taskID, mongoDB, l) + if errDel != nil { + l.Debug().Str("delete_error", errDel.Error()).Str("task_id", taskID.String()).Msg("Deletion error.") + } + } } - // TODO: handle status!=0 + } //------------------------------------------------------------------ @@ -255,9 +259,187 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram //------------------------------------------------------------------ thisTaskRecord.ProcessData(l) + //------------------------------------------------------------------ + // Update task in DB + //------------------------------------------------------------------ + _, err = thisTaskRecord.UpdateTask(nodeData.ID, test.Framework, task, mongoDB, l) + if err != nil { + return err + } + } } return err } + +// Looks for a framework-task-node in the TaskDB and retreives all the IDs adn tasks status +func CheckTaskDatabase(address string, service string, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (tasksInQueue uint32, tasksDone uint32, blackList []int, tasksIDs []primitive.ObjectID, err error) { + // define blacklist as length zero + blackList = make([]int, 0) + + // Get tasks collection + tasksCollection := mongoDB.GetCollection(types.TaskCollection) + // Get tasks instances + instancesCollection := mongoDB.GetCollection(types.InstanceCollection) + + // Set filtering for this node-service pair data + task_request_filter := bson.D{{Key: "requester_args.address", Value: address}, + {Key: "requester_args.service", Value: service}, + {Key: "framework", Value: framework}, + {Key: "tasks", Value: task}} + + // Set mongo context + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Now retrieve all node task requests entries + cursor, err := tasksCollection.Find(ctxM, task_request_filter) + if err != nil { + l.Error().Msg("Could not retrieve task request data from MongoDB.") + return tasksInQueue, tasksDone, blackList, tasksIDs, err + } + defer cursor.Close(ctxM) + for cursor.Next(ctxM) { + var taskReq types.TaskRequestRecord + if err := cursor.Decode(&taskReq); err != nil { + l.Error().Msg("Could not decode task request data from MongoDB.") + return tasksInQueue, tasksDone, blackList, tasksIDs, err + } + // Save id + tasksIDs = append(tasksIDs, taskReq.Id) + // If not already done + if !taskReq.Done { + l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Msg("Found pending task.") + // Count pending + tasksInQueue += uint32(taskReq.Qty) + + // Search for associated instances to retrieve ids + instances_filter := bson.D{{"task_id", taskReq.Id}} + cursor2, err := instancesCollection.Find(ctxM, instances_filter) + if err != nil { + l.Error().Msg("Could not retrieve instances data from MongoDB.") + return tasksInQueue, tasksDone, blackList, tasksIDs, err + } + defer cursor2.Close(ctxM) + // Get all ids + for cursor2.Next(ctxM) { + var thisInstance types.InstanceRecord + if err := cursor.Decode(&thisInstance); err != nil { + l.Error().Msg("Could not decode task request data from MongoDB.") + return tasksInQueue, tasksDone, blackList, tasksIDs, err + } + for _, docId := range thisInstance.DocIds { + blackList = append(blackList, docId) + } + } + } else { + l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Msg("Found done task.") + tasksDone += 1 + } + + } + + l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Int32("tasksDone", int32(tasksDone)).Int32("tasksInQueue", int32(tasksInQueue)).Int("tasksIDsLen", len(tasksIDs)).Msg("Pending tasks analized.") + + return tasksInQueue, tasksDone, blackList, tasksIDs, err + +} + +// Given a TaskID from MongoDB, deletes all associated entries from the "tasks", "instances", "prompts", "responses" and "results" collections. +func RemoveTaskID(taskID primitive.ObjectID, mongoDB mongodb.MongoDb, l *zerolog.Logger) (err error) { + + //-------------------------------------------------------------------------- + //-------------------------- Instances ------------------------------------- + //-------------------------------------------------------------------------- + instancesCollection := mongoDB.GetCollection(types.InstanceCollection) + // Set filtering for this node-service pair data + task_request_filter := bson.D{{Key: "task_id", Value: taskID}} + // Set mongo context + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err := instancesCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete instances data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted instances data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Prompts --------------------------------------- + //-------------------------------------------------------------------------- + promptsCollection := mongoDB.GetCollection(types.PromptsCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "task_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = promptsCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete prompts data from MongoDB.") + return err + } + + l.Debug().Int("deleted", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted prompts data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Responses ------------------------------------- + //-------------------------------------------------------------------------- + responsesCollection := mongoDB.GetCollection(types.ResponsesCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "task_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = responsesCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete responses data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted responses data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Results --------------------------------------- + //-------------------------------------------------------------------------- + resultsCollection := mongoDB.GetCollection(types.ResultsCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "result_data.task_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = resultsCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete results data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted results data from MongoDB") + + //-------------------------------------------------------------------------- + //-------------------------- Task ------------------------------------------ + //-------------------------------------------------------------------------- + tasksCollection := mongoDB.GetCollection(types.TaskCollection) + // Set filtering for this node-service pair data + task_request_filter = bson.D{{Key: "_id", Value: taskID}} + // Set mongo context + ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Now retrieve all node task requests entries + response, err = tasksCollection.DeleteMany(ctxM, task_request_filter) + if err != nil { + l.Warn().Msg("Could not delete task data from MongoDB.") + return err + } + + l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted task data from MongoDB") + + return nil + +} diff --git a/apps/go/manager/activities/get_staked.go b/apps/go/manager/activities/get_staked.go index 3989ef9..7991e75 100644 --- a/apps/go/manager/activities/get_staked.go +++ b/apps/go/manager/activities/get_staked.go @@ -3,6 +3,7 @@ package activities import ( "context" "manager/types" + "strconv" ) var GetStakedName = "get_staked" @@ -40,11 +41,31 @@ func (aCtx *Ctx) GetStaked(ctx context.Context, params types.GetStakedParams) (* l.Info().Int("nodes_staked", len(result.Nodes)).Msg("Successfully pulled staked node-services.") } - // // cheap mock - // for i := 0; i < 5; i++ { - // thisNode := NodeData{Address: fmt.Sprint(i), Service: fmt.Sprint(i * 10)} - // result.Nodes = append(result.Nodes, thisNode) - // } + // Get block data + currHeight, err := aCtx.App.PocketRpc.GetHeight() + if err != nil { + l.Error().Str("service", params.Service).Msg("Could not retrieve latest block hieght.") + return nil, err + } + blockParams, err := aCtx.App.PocketRpc.GetAllParams(currHeight) + if err != nil { + l.Error().Str("service", params.Service).Msg("Could not retrieve block params.") + return nil, err + } + blocksPerSession, ok := blockParams.NodeParams.Get("pos/BlocksPerSession") + if !ok { + l.Error().Str("service", params.Service).Msg("Cannot get blocks per session parameter.") + return nil, err + } + + result.Block.Height = currHeight + i64, err := strconv.ParseInt(blocksPerSession, 10, 64) + if err != nil { + l.Error().Str("service", params.Service).Msg("Could convert parameter to number.") + return nil, err + } + + result.Block.BlocksPerSession = i64 return &result, nil } diff --git a/apps/go/manager/activities/trigger_sampler.go b/apps/go/manager/activities/trigger_sampler.go index bdff807..b0404b3 100644 --- a/apps/go/manager/activities/trigger_sampler.go +++ b/apps/go/manager/activities/trigger_sampler.go @@ -30,7 +30,7 @@ func (aCtx *Ctx) TriggerSampler(_ context.Context, params types.TriggerSamplerPa Blacklist: params.Trigger.Blacklist, Qty: params.Trigger.Qty, } - evaluatorWorkflowOptions := client.StartWorkflowOptions{ + samplerWorkflowOptions := client.StartWorkflowOptions{ TaskQueue: aCtx.App.Config.Temporal.Sampler.TaskQueue, WorkflowExecutionErrorWhenAlreadyStarted: true, WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, @@ -42,7 +42,7 @@ func (aCtx *Ctx) TriggerSampler(_ context.Context, params types.TriggerSamplerPa // Do not wait for a result by not calling .Get() on the returned future _, err := aCtx.App.TemporalClient.ExecuteWorkflow( context.Background(), - evaluatorWorkflowOptions, + samplerWorkflowOptions, aCtx.App.Config.Temporal.Sampler.WorkflowName, samplerParams, ) diff --git a/apps/go/manager/config.sample.json b/apps/go/manager/config.sample.json index 75d1d7b..8e03956 100644 --- a/apps/go/manager/config.sample.json +++ b/apps/go/manager/config.sample.json @@ -33,9 +33,24 @@ } }, "frameworks": { - "lmeh" : {"task_types": {"any" : "numerical"}}, - "helm" : {"task_types": {"any" : "numerical"}}, - "signatures" : {"task_types": {"any" : "signature"}} + "lmeh" : { + "task_types": {"any" : "numerical"}, + "task_dependency": {"any" : "signatures:tokenizer:ok"}, + "schedule_limits": {"any" : "none:none"}, + "trigger_minimum": {"any" : "0"} + }, + "helm" : { + "task_types": {"any" : "numerical"}, + "task_dependency": {"any" : "signatures:tokenizer:ok"}, + "schedule_limits": {"any" : "none:none"}, + "trigger_minimum": {"any" : "0"} + }, + "signatures" : { + "task_types": {"any" : "signature"}, + "task_dependency": {"any" : "none:none:none"}, + "schedule_limits": {"any" : "1:session"}, + "trigger_minimum": {"tokenizer" : "1"} + } } } diff --git a/apps/go/manager/records/node.go b/apps/go/manager/records/node.go index 3932e2d..8c1c6ed 100644 --- a/apps/go/manager/records/node.go +++ b/apps/go/manager/records/node.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -21,84 +22,17 @@ import ( // DB entry of a given node-service pair // The "Tasks" array will hold as many entries as tasks being tested type NodeRecord struct { - Address string `bson:"address"` - Service string `bson:"service"` - LastSeenHeight uint32 `bson:"last_seen_height"` - LastSeenTime time.Time `bson:"last_seen_time"` - NumericalTasks []NumericalTaskRecord `bson:"numerical_tasks"` - SignatureTasks []SignatureTaskRecord `bson:"signature_tasks"` + ID primitive.ObjectID `bson:"_id,omitempty"` + Address string `bson:"address"` + Service string `bson:"service"` + LastSeenHeight int64 `bson:"last_seen_height"` + LastSeenTime time.Time `bson:"last_seen_time"` } -// Creates and array of interfaces that contains all tasks -func (record *NodeRecord) CombineTasks() []TaskInterface { - combinedTasks := make([]TaskInterface, 0, len(record.NumericalTasks)+len(record.SignatureTasks)) +func (record *NodeRecord) FindAndLoadNode(node types.NodeData, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) { - for _, element := range record.NumericalTasks { - combinedTasks = append(combinedTasks, &element) - } - - for _, element := range record.SignatureTasks { - combinedTasks = append(combinedTasks, &element) - } - - return combinedTasks -} - -func (record *NodeRecord) GetPrunedTasks(taskArray []TaskInterface, maxAge time.Duration, l *zerolog.Logger) []TaskInterface { - // Indices to remove - var indicesToRemove []int - // For each task - for i, task := range taskArray { - // Check the date of the index end - oldestAge := time.Since(task.GetLastSeen()) - // Check - if oldestAge >= maxAge { - // Add to remove list - indicesToRemove = append(indicesToRemove, i) - - l.Info().Str("address", record.Address).Str("service", record.Service).Str("framework", task.GetFramework()).Str("task", task.GetTask()).Msg("Removing task due to old age.") - } - } - - // Remove Tasks - for i := len(indicesToRemove) - 1; i >= 0; i-- { - index := indicesToRemove[i] - taskArray = append(taskArray[:index], taskArray[index+1:]...) - } - - return taskArray -} - -// Go through all task and remove the ones that have no new samples since the limit -func (record *NodeRecord) PruneTasks(l *zerolog.Logger) error { - - // Maximum age of a task - maxAge := time.Duration(TaskTTLDays) * 24 * time.Hour - - // Remove Numerical Tasks - var numTaskInterfaces []TaskInterface - for _, task := range record.NumericalTasks { - numTaskInterfaces = append(numTaskInterfaces, &task) - } - tasksPrunned := record.GetPrunedTasks(numTaskInterfaces, maxAge, l) - for i, task := range tasksPrunned { - record.NumericalTasks[i] = *(task.(*NumericalTaskRecord)) - } - - // Remove Signature Tasks - var signTaskInterfaces []TaskInterface - for _, task := range record.SignatureTasks { - signTaskInterfaces = append(signTaskInterfaces, &task) - } - tasksPrunned = record.GetPrunedTasks(signTaskInterfaces, maxAge, l) - for i, task := range tasksPrunned { - record.SignatureTasks[i] = *(task.(*SignatureTaskRecord)) - } - - return nil -} - -func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) { + // Get nodes collection + nodesCollection := mongoDB.GetCollection(types.NodesCollection) // Set filtering for this node-service pair data node_filter := bson.D{{Key: "address", Value: node.Address}, {Key: "service", Value: node.Service}} @@ -110,7 +44,7 @@ func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongod // Retrieve this node entry var found bool = true - cursor := collection.FindOne(ctxM, node_filter, opts) + cursor := nodesCollection.FindOne(ctxM, node_filter, opts) err := cursor.Decode(record) if err != nil { if err == mongo.ErrNoDocuments { @@ -127,89 +61,30 @@ func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongod return found, nil } -func (record *NodeRecord) AppendTask(framework string, task string, date time.Time, frameworkConfigMap map[string]types.FrameworkConfig, l *zerolog.Logger) TaskInterface { +func (record *NodeRecord) AppendTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) TaskInterface { taskType, err := GetTaskType(framework, task, frameworkConfigMap, l) if err != nil { return nil } - - // Fill base task data - baseTaskData := BaseTaskRecord{ - Framework: framework, - Task: task, - LastSeen: date, - } - - if taskType == NumericalTaskTypeName { - // TODO: Get default values from framework-task - bufferLen := NumericalCircularBufferLength - timeArray := make([]time.Time, bufferLen) - for i := range timeArray { - timeArray[i] = date - } - - newTask := NumericalTaskRecord{ - TaskData: baseTaskData, - MeanScore: 0.0, - StdScore: 0.0, - ScoresSamples: make([]ScoresSample, bufferLen), - CircBuffer: types.CircularBuffer{ - CircBufferLen: bufferLen, - NumSamples: 0, - Times: timeArray, - Indexes: types.CircularIndexes{ - Start: 0, - End: 0, - }, - }, - } - - record.NumericalTasks = append(record.NumericalTasks, newTask) - - return &newTask - - } else if taskType == SignatureTaskTypeName { - - // TODO: Get default values from framework-task - bufferLen := SignatureCircularBufferLength - timeArray := make([]time.Time, bufferLen) - for i := range timeArray { - timeArray[i] = date - } - - newTask := SignatureTaskRecord{ - TaskData: baseTaskData, - LastSignature: "", - Signatures: make([]SignatureSample, bufferLen), - CircBuffer: types.CircularBuffer{ - CircBufferLen: bufferLen, - NumSamples: 0, - Times: timeArray, - Indexes: types.CircularIndexes{ - Start: 0, - End: 0, - }, - }, - } - - record.SignatureTasks = append(record.SignatureTasks, newTask) - - return &newTask + // Get the task, wich will create it if not found + taskRecord, found := GetTaskData(nodeID, taskType, framework, task, mongoDB, l) + if !found { + return nil + } else { + return taskRecord } - return nil - } -func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMap map[string]types.FrameworkConfig, l *zerolog.Logger) error { +func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) error { // Initialize empty record // Set node data record.Address = params.Node.Address record.Service = params.Node.Service - // Never tested + record.ID = primitive.NewObjectID() record.LastSeenHeight = 0 defaultDate := time.Date(2018, 1, 1, 00, 00, 00, 100, time.Local) record.LastSeenTime = defaultDate @@ -222,15 +97,20 @@ func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMa for _, task := range test.Tasks { // Add all tasks with the current date as maker for creation - _ = record.AppendTask(test.Framework, task, time.Now(), frameworkConfigMap, l) + _ = record.AppendTask(record.ID, test.Framework, task, time.Now(), frameworkConfigMap, mongoDB, l) } } - return nil + _, err := record.UpdateNode(mongoDB, l) + + return err } -func (record *NodeRecord) UpdateNode(collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) { +func (record *NodeRecord) UpdateNode(mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) { + + // Get nodes collection + nodesCollection := mongoDB.GetCollection(types.NodesCollection) opts := options.FindOneAndUpdate().SetUpsert(true) node_filter := bson.D{{Key: "address", Value: record.Address}, {Key: "service", Value: record.Service}} @@ -241,7 +121,7 @@ func (record *NodeRecord) UpdateNode(collection mongodb.CollectionAPI, l *zerolo update := bson.D{{Key: "$set", Value: record}} // Get collection and update var found bool = true - err := collection.FindOneAndUpdate(ctxM, node_filter, update, opts).Decode(record) + err := nodesCollection.FindOneAndUpdate(ctxM, node_filter, update, opts).Decode(record) if err != nil { if err == mongo.ErrNoDocuments { l.Warn().Str("address", record.Address).Str("service", record.Service).Msg("Node entry not found, a new one was created.") diff --git a/apps/go/manager/records/result.go b/apps/go/manager/records/result.go index 5352026..4e1b1d8 100644 --- a/apps/go/manager/records/result.go +++ b/apps/go/manager/records/result.go @@ -7,6 +7,7 @@ import ( "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -17,12 +18,19 @@ import ( // This is the basic information that all results should have type BaseResultRecord struct { - Address string `bson:"address"` - Service string `bson:"service"` - Height uint32 `bson:"height"` - Framework string `bson:"framework"` - Task string `bson:"task"` - Status uint32 `bson:"status"` + TaskID string `bson:"task_id"` + NumSamples uint32 `bson:"num_samples"` + Status uint32 `bson:"status"` + ResultHeight int64 `bson:"result_height"` + ResultTime time.Time `bson:"result_time"` +} + +func (record *BaseResultRecord) GetResultTime() time.Time { + return record.ResultTime +} + +func (record *BaseResultRecord) GetResultHeight() int64 { + return record.ResultHeight } // ------------------------------------------------------------------------------ @@ -30,13 +38,12 @@ type BaseResultRecord struct { // ------------------------------------------------------------------------------ type ResultInterface interface { + GetResultHeight() int64 + GetResultTime() time.Time GetNumSamples() uint32 GetStatus() uint32 GetSample(int) interface{} - FindAndLoadResults(address string, - service string, - framework string, - task string, + FindAndLoadResults(taskID primitive.ObjectID, collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) } @@ -49,34 +56,35 @@ type ResultInterface interface { // The NumericalResultRecord field indicates how many samples were actually calculated type NumericalResultRecord struct { ResultData BaseResultRecord `bson:"result_data"` - NumSamples uint32 `bson:"num_samples"` ScoresSamples []ScoresSample `bson:"scores"` } -func (record *NumericalResultRecord) GetStatus() uint32 { - return record.ResultData.Status +func (record *NumericalResultRecord) GetResultTime() time.Time { + return record.ResultData.GetResultTime() +} + +func (record *NumericalResultRecord) GetResultHeight() int64 { + return record.ResultData.GetResultHeight() } func (record *NumericalResultRecord) GetNumSamples() uint32 { - return record.NumSamples + return record.ResultData.NumSamples +} + +func (record *NumericalResultRecord) GetStatus() uint32 { + return record.ResultData.Status } + func (record *NumericalResultRecord) GetSample(index int) interface{} { return record.ScoresSamples[index] } -func (record *NumericalResultRecord) FindAndLoadResults(address string, - service string, - framework string, - task string, +func (record *NumericalResultRecord) FindAndLoadResults(taskID primitive.ObjectID, collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) { // Set filtering for this result - result_filter := bson.D{{Key: "result_data.address", Value: address}, - {Key: "result_data.service", Value: service}, - {Key: "result_data.framework", Value: framework}, - {Key: "result_data.task", Value: task}, - } + result_filter := bson.D{{Key: "result_data.task_id", Value: taskID}} opts := options.FindOne() // Set mongo context @@ -89,7 +97,7 @@ func (record *NumericalResultRecord) FindAndLoadResults(address string, err := cursor.Decode(record) if err != nil { if err == mongo.ErrNoDocuments { - l.Debug().Str("address", address).Str("service", service).Msg("Node results entry not found.") + l.Debug().Str("task_id", taskID.String()).Msg("Results entry not found for given TaskID.") found = false } else { l.Error().Msg("Could not retrieve results data from MongoDB.") @@ -108,34 +116,36 @@ func (record *NumericalResultRecord) FindAndLoadResults(address string, // The NumericalResultRecord field indicates how many samples were actually calculated type SignatureResultRecord struct { ResultData BaseResultRecord `bson:"result_data"` - NumSamples uint32 `bson:"num_samples"` ScoresSamples []SignatureSample `bson:"signatures"` } -func (record *SignatureResultRecord) GetStatus() uint32 { - return record.ResultData.Status +func (record *SignatureResultRecord) GetResultTime() time.Time { + return record.ResultData.GetResultTime() +} + +func (record *SignatureResultRecord) GetResultHeight() int64 { + return record.ResultData.GetResultHeight() } func (record *SignatureResultRecord) GetNumSamples() uint32 { - return record.NumSamples + return record.ResultData.NumSamples } + +func (record *SignatureResultRecord) GetStatus() uint32 { + return record.ResultData.Status +} + func (record *SignatureResultRecord) GetSample(index int) interface{} { return record.ScoresSamples[index] } -func (record *SignatureResultRecord) FindAndLoadResults(address string, - service string, - framework string, - task string, +func (record *SignatureResultRecord) FindAndLoadResults(taskID primitive.ObjectID, collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) { // Set filtering for this result - result_filter := bson.D{{Key: "result_data.address", Value: address}, - {Key: "result_data.service", Value: service}, - {Key: "result_data.framework", Value: framework}, - {Key: "result_data.task", Value: task}, - } + result_filter := bson.D{{Key: "result_data.task_id", Value: taskID}} + opts := options.FindOne() // Set mongo context @@ -148,13 +158,16 @@ func (record *SignatureResultRecord) FindAndLoadResults(address string, err := cursor.Decode(record) if err != nil { if err == mongo.ErrNoDocuments { - l.Debug().Str("address", address).Str("service", service).Msg("Node results entry not found.") + l.Debug().Str("task_id", taskID.String()).Msg("Results entry not found for given TaskID.") found = false } else { l.Error().Msg("Could not retrieve results data from MongoDB.") return false, err } } + if found { + l.Debug().Str("task_id", taskID.String()).Msg("Results retrieved") + } return found, nil } diff --git a/apps/go/manager/records/task.go b/apps/go/manager/records/task.go index ef9cf81..a7c2d4a 100644 --- a/apps/go/manager/records/task.go +++ b/apps/go/manager/records/task.go @@ -1,13 +1,20 @@ package records import ( + "context" "fmt" "manager/types" + "packages/mongodb" "sort" + "strconv" "strings" "time" "github.com/rs/zerolog" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "gonum.org/v1/gonum/stat" ) @@ -17,9 +24,16 @@ import ( // This is the basic information that all tasks should have type BaseTaskRecord struct { - Framework string `bson:"framework"` - Task string `bson:"task"` - LastSeen time.Time `bson:"last_seen"` + NodeID primitive.ObjectID `bson:"node_id"` + Framework string `bson:"framework"` + Task string `bson:"task"` + + LastSeen time.Time `bson:"last_seen"` + LastHeight int64 `bson:"last_height"` +} + +func (record *BaseTaskRecord) GetNodeID() primitive.ObjectID { + return record.NodeID } func (record *BaseTaskRecord) GetTask() string { @@ -34,11 +48,20 @@ func (record *BaseTaskRecord) GetLastSeen() time.Time { return record.LastSeen } +func (record *BaseTaskRecord) GetLastHeight() int64 { + return record.LastHeight +} + func (record *BaseTaskRecord) UpdateLastSeen(timeSample time.Time) (err error) { record.LastSeen = timeSample return nil } +func (record *BaseTaskRecord) UpdateLastHeight(height int64) (err error) { + record.LastHeight = height + return nil +} + // The maximum age of a task entry. const TaskTTLDays uint32 = 32 @@ -48,34 +71,59 @@ const TaskTTLDays uint32 = 32 type TaskInterface interface { ProcessData(l *zerolog.Logger) error - stepIndex(step int, marker string) error + StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error CycleIndexes(l *zerolog.Logger) error - InsertSample(timeSample time.Time, data interface{}) (err error) + InsertSample(timeSample time.Time, data interface{}, l *zerolog.Logger) (err error) GetNumSamples() uint32 GetFramework() string GetTask() string - UpdateLastSeen(timeSample time.Time) (err error) GetMinSamplesPerTask() uint32 GetMaxConcurrentSamplesPerTask() uint32 GetCircularBufferLength() uint32 GetSampleTTLDays() uint32 GetResultStruct() ResultInterface GetLastSeen() time.Time + GetLastHeight() int64 + UpdateLastSeen(timeSample time.Time) (err error) + UpdateLastHeight(height int64) (err error) IsOK() bool + NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) + LoadTask(nodeID primitive.ObjectID, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) + UpdateTask(nodeID primitive.ObjectID, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) } // Get specific task data from a node record -func GetTaskData(nodeData *NodeRecord, framework string, task string, l *zerolog.Logger) (TaskInterface, bool) { +func GetTaskData(nodeID primitive.ObjectID, taskType string, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (TaskInterface, bool) { - // Get all tasks as a single array - combinedTasks := nodeData.CombineTasks() // Look for entry - for _, taskEntry := range combinedTasks { - // Check if the Name field matches the search string - if taskEntry.GetFramework() == framework && taskEntry.GetTask() == task { - l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("Found!") - return taskEntry, true + if taskType == NumericalTaskTypeName { + // get task record + var record NumericalTaskRecord + found, err := record.LoadTask(nodeID, framework, task, mongoDB, l) + if err != nil { + l.Error().Str("nodeID", nodeID.String()).Str("framework", framework).Str("task", task).Msg("cannot find default task buffer") + return nil, false + } + if !found { + // Initialize and save + record.NewTask(nodeID, framework, task, types.EpochStart.UTC(), l) + record.UpdateTask(nodeID, framework, task, mongoDB, l) + } + return &record, true + } else if taskType == SignatureTaskTypeName { + // set task record + var record SignatureTaskRecord + found, err := record.LoadTask(nodeID, framework, task, mongoDB, l) + if err != nil { + l.Error().Str("nodeID", nodeID.String()).Str("framework", framework).Str("task", task).Msg("cannot find default task buffer") + return nil, false } + if !found { + // Initialize and save + record.NewTask(nodeID, framework, task, types.EpochStart.UTC(), l) + record.UpdateTask(nodeID, framework, task, mongoDB, l) + } + return &record, true } return nil, false @@ -110,25 +158,24 @@ func GetTaskType(framework string, task string, configMap map[string]types.Frame // Analyzes the configuration and returns if it is possible to proceed with this task triggering/analysis // A task can depend on others (such as having a tokenizer signature), here we check for that -func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, configMap map[string]types.FrameworkConfig, l *zerolog.Logger) (status bool, err error) { - status = false +func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, configMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) { // Get Framework config frameworkCfg, ok := configMap[framework] if !ok { l.Error().Str("framework", framework).Msg("framework config not found") - err = fmt.Errorf("framework config not found") + err := fmt.Errorf("framework config not found") return false, err } - // Get task type + // Get task dependency taskDep, ok := frameworkCfg.TasksDependency[task] if !ok { // Search for the "any" field taskDep, ok = frameworkCfg.TasksDependency["any"] if !ok { l.Error().Str("framework", framework).Str("task", task).Msg("cannot find default (or specific) value for task type") - err = fmt.Errorf("cannot find default (or specific) value for task type") + err := fmt.Errorf("cannot find default (or specific) value for task type") return false, err } } @@ -144,7 +191,12 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", framework).Str("task", task).Msg("No dependency: Dependecy OK") return true, nil } - thisTaskRecord, found := GetTaskData(nodeData, frameworkTaskandStatus[0], frameworkTaskandStatus[1], l) + taskType, err := GetTaskType(frameworkTaskandStatus[0], frameworkTaskandStatus[1], configMap, l) + if err != nil { + l.Error().Str("framework", framework).Str("task", task).Str("task type", taskType).Msg("Error getting task type") + return false, err + } + thisTaskRecord, found := GetTaskData(nodeData.ID, taskType, frameworkTaskandStatus[0], frameworkTaskandStatus[1], mongoDB, l) if !found { // The task is not even created, we must fail return false, nil @@ -169,6 +221,110 @@ func CheckTaskDependency(nodeData *NodeRecord, framework string, task string, co return false, nil } +// Analyzes the configuration and checks wheter the triggering the task will +// break the schedule limits or not (i.e. trigger twice in the same session) +func CheckTaskSchedule(taskData TaskInterface, block types.BlockData, configMap map[string]types.FrameworkConfig, l *zerolog.Logger) (bool, error) { + + framework := taskData.GetFramework() + task := taskData.GetTask() + + // Get Framework config + frameworkCfg, ok := configMap[framework] + if !ok { + l.Error().Str("framework", framework).Msg("framework config not found") + err := fmt.Errorf("framework config not found") + return false, err + } + + // Get task schedule + taskSchedule, ok := frameworkCfg.ScheduleLimits[task] + if !ok { + // Search for the "any" field + taskSchedule, ok = frameworkCfg.ScheduleLimits["any"] + if !ok { + l.Error().Str("framework", framework).Str("task", task).Msg("cannot find default (or specific) value for task schedule") + err := fmt.Errorf("cannot find default (or specific) value for task schedule") + return false, err + } + } + + // Check schedule + frameworkTaskandSchedule := strings.Split(taskSchedule, ":") + if len(frameworkTaskandSchedule) != 2 { + l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency configuration, expected two elements separated by \":\" ") + return false, nil + } + if frameworkTaskandSchedule[0] == "none" { + // No dependencies + l.Debug().Str("framework", framework).Str("task", task).Msg("No schedule: Dchedule OK") + return true, nil + } + value, err := strconv.ParseInt(frameworkTaskandSchedule[0], 10, 32) + if err != nil { + l.Error().Str("framework", framework).Str("task", task).Msg("malformed dependency configuration, first element must be an integer number") + return false, nil + } + + if frameworkTaskandSchedule[1] == "session" { + // Check if session is within minimum schedule + lastHeight := taskData.GetLastHeight() + if (block.Height - lastHeight) >= (value * block.BlocksPerSession) { + return true, nil + } + + } else if frameworkTaskandSchedule[1] == "block" { + // Check if amount of blocks have passed + lastHeight := taskData.GetLastHeight() + if (block.Height - lastHeight) >= value { + return true, nil + } + + } else { + l.Error().Str("framework", framework).Str("task", task).Str("second_element", frameworkTaskandSchedule[1]).Msg("schedule configuration cannot be processed (second element type unknown)") + return false, nil + } + + return true, nil +} + +// Analyzes the configuration and checks wheter the task should be triggered +// despite having its buffers filled and up to date. This is useful for tasks +// that require scheduled updates, like signatures (i.e getting tokenizers every session) +func CheckTaskTriggerMin(taskData TaskInterface, block types.BlockData, configMap map[string]types.FrameworkConfig, l *zerolog.Logger) (uint32, error) { + + framework := taskData.GetFramework() + task := taskData.GetTask() + + // Get Framework config + frameworkCfg, ok := configMap[framework] + if !ok { + l.Error().Str("framework", framework).Msg("framework config not found") + err := fmt.Errorf("framework config not found") + return 0, err + } + + // Get task schedule + taskTriggerMin, ok := frameworkCfg.TriggerMinimum[task] + if !ok { + // Search for the "any" field + taskTriggerMin, ok = frameworkCfg.TriggerMinimum["any"] + if !ok { + l.Error().Str("framework", framework).Str("task", task).Msg("cannot find default (or specific) value for task trgger minimum") + err := fmt.Errorf("cannot find default (or specific) value for task trigger minimum") + return 0, err + } + } + + // Check trigger minimum + value, err := strconv.ParseInt(taskTriggerMin, 10, 32) + if err != nil { + l.Error().Str("framework", framework).Str("task", task).Msg("malformed trigger minimum configuration, the entry must be a positive integer number") + return 0, nil + } + + return uint32(value), nil +} + // ------------------------------------------------------------------------------ // NumericalTaskRecord // ------------------------------------------------------------------------------ @@ -207,6 +363,89 @@ type ScoresSample struct { ID int `bson:"id"` } +func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) { + // TODO: Get default values from framework-task + bufferLen := NumericalCircularBufferLength + timeArray := make([]time.Time, bufferLen) + for i := range timeArray { + timeArray[i] = date + } + + record.TaskData.NodeID = nodeID + record.TaskData.Framework = framework + record.TaskData.Task = task + record.TaskData.LastSeen = date + + record.MeanScore = 0.0 + record.StdScore = 0.0 + record.ScoresSamples = make([]ScoresSample, bufferLen) + record.CircBuffer = types.CircularBuffer{ + CircBufferLen: bufferLen, + NumSamples: 0, + Times: timeArray, + Indexes: types.CircularIndexes{ + Start: 0, + End: 0, + }, + } + +} + +func (record *NumericalTaskRecord) LoadTask(nodeID primitive.ObjectID, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) { + + task_filter := bson.D{{Key: "task_data.node_id", Value: nodeID}, {Key: "task_data.framework", Value: framework}, {Key: "task_data.task", Value: task}} + tasksCollection := mongoDB.GetCollection(types.NumericalTaskCollection) + opts := options.FindOne() + + // Set mongo context + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Retrieve this node entry + var found bool = true + cursor := tasksCollection.FindOne(ctxM, task_filter, opts) + err := cursor.Decode(record) + if err != nil { + if err == mongo.ErrNoDocuments { + l.Warn().Str("node_id", nodeID.String()).Str("framework", framework).Str("task", task).Msg("Numerical Task not found") + found = false + } else { + l.Error().Msg("Could not retrieve task data from MongoDB.") + fmt.Print(err) + return false, err + } + } + + return found, nil +} + +func (record *NumericalTaskRecord) UpdateTask(nodeID primitive.ObjectID, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) { + + tasksCollection := mongoDB.GetCollection(types.NumericalTaskCollection) + + opts := options.FindOneAndUpdate().SetUpsert(true) + task_filter := bson.D{{Key: "task_data.node_id", Value: nodeID}, {Key: "task_data.framework", Value: framework}, {Key: "task_data.task", Value: task}} + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Update given struct + update := bson.D{{Key: "$set", Value: record}} + // Get collection and update + var found bool = true + err := tasksCollection.FindOneAndUpdate(ctxM, task_filter, update, opts).Decode(record) + if err != nil { + if err == mongo.ErrNoDocuments { + l.Warn().Str("node_id", nodeID.String()).Str("framework", framework).Str("task", task).Msg("Numerical Task not found, creating one.") + found = false + } else { + l.Error().Msg("Could not retrieve numerical task data from MongoDB.") + return false, err + } + } + + return found, nil +} + func (record *NumericalTaskRecord) GetMinSamplesPerTask() uint32 { return NumericalMinSamplesPerTask } @@ -235,11 +474,20 @@ func (record *NumericalTaskRecord) GetLastSeen() time.Time { return record.TaskData.GetLastSeen() } +func (record *NumericalTaskRecord) GetLastHeight() int64 { + return record.TaskData.GetLastHeight() +} + func (record *NumericalTaskRecord) UpdateLastSeen(timeSample time.Time) (err error) { record.TaskData.UpdateLastSeen(timeSample) return nil } +func (record *NumericalTaskRecord) UpdateLastHeight(height int64) (err error) { + record.TaskData.UpdateLastHeight(height) + return nil +} + // Returns the number of valid samples in the circular buffer func (record *NumericalTaskRecord) GetNumSamples() uint32 { return record.CircBuffer.NumSamples @@ -258,21 +506,19 @@ func (record *NumericalTaskRecord) IsOK() bool { // Calculate task statistics func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) { + // Get valid samples + validIdx, err := record.CircBuffer.GetBufferValidIndexes(l) + if err != nil { + return err + } + // Slice the buffer and cast var auxData []float64 - idxNow := record.CircBuffer.Indexes.Start - for true { - // run until we complete the circular buffer - if idxNow == record.CircBuffer.Indexes.End { - break - } + for _, sampleId := range validIdx { // Add sample to data array - auxData = append(auxData, float64(record.ScoresSamples[idxNow].Score)) - // perform the step - nextVal := int(idxNow) + 1 - // Check limits and assign value - idxNow = record.CircBuffer.BufferLimitCheck(nextVal, record.CircBuffer.Indexes.End) + auxData = append(auxData, float64(record.ScoresSamples[sampleId].Score)) } + length := len(auxData) if length == 0 { record.MeanScore = 0 @@ -299,15 +545,15 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) { } // Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer) -func (record *NumericalTaskRecord) stepIndex(step int, marker string) error { - return record.CircBuffer.StepIndex(step, marker) +func (record *NumericalTaskRecord) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { + return record.CircBuffer.StepIndex(step, marker, positive_step, l) } // Updates the indexes making them point to the initial and final samples in a given time window. func (record *NumericalTaskRecord) CycleIndexes(l *zerolog.Logger) error { return record.CircBuffer.CycleIndexes(NumericalSampleTTLDays, l) } -func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data interface{}) (err error) { +func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data interface{}, l *zerolog.Logger) (err error) { // Assert data type dataOk, ok := data.(ScoresSample) if !ok { @@ -315,7 +561,7 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter } // Increment the end - err = record.stepIndex(1, "end") + err = record.StepIndex(1, "end", true, l) // Save sample record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID @@ -339,13 +585,13 @@ const SignatureTaskTypeName string = "signature" const SignatureSampleTTLDays uint32 = 5 // Minimum number of samples to have in a task to consider that it does not require more samples -const SignatureMinSamplesPerTask uint32 = 50 +const SignatureMinSamplesPerTask uint32 = 5 // Maximum size of result buffer and also maximum number of samples to ask per task -const SignatureMaxConcurrentSamplesPerTask uint32 = 10 +const SignatureMaxConcurrentSamplesPerTask uint32 = 1 // This is the length of the buffer and will set the maximum accuracy of the metric. -const SignatureCircularBufferLength uint32 = NumericalMinSamplesPerTask +const SignatureCircularBufferLength uint32 = SignatureMinSamplesPerTask // Signatures task data type SignatureTaskRecord struct { @@ -363,6 +609,87 @@ type SignatureSample struct { ID int `bson:"id"` } +func (record *SignatureTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) { + // TODO: Get default values from framework-task + bufferLen := SignatureCircularBufferLength + timeArray := make([]time.Time, bufferLen) + for i := range timeArray { + timeArray[i] = date + } + + record.TaskData.NodeID = nodeID + record.TaskData.Framework = framework + record.TaskData.Task = task + record.TaskData.LastSeen = date + + record.LastSignature = "" + record.Signatures = make([]SignatureSample, bufferLen) + record.CircBuffer = types.CircularBuffer{ + CircBufferLen: bufferLen, + NumSamples: 0, + Times: timeArray, + Indexes: types.CircularIndexes{ + Start: 0, + End: 0, + }, + } +} + +func (record *SignatureTaskRecord) LoadTask(nodeID primitive.ObjectID, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) { + + task_filter := bson.D{{Key: "task_data.node_id", Value: nodeID}, {Key: "task_data.framework", Value: framework}, {Key: "task_data.task", Value: task}} + tasksCollection := mongoDB.GetCollection(types.SignaturesTaskCollection) + opts := options.FindOne() + + // Set mongo context + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Retrieve this node entry + var found bool = true + cursor := tasksCollection.FindOne(ctxM, task_filter, opts) + err := cursor.Decode(record) + if err != nil { + if err == mongo.ErrNoDocuments { + l.Warn().Str("node_id", nodeID.String()).Str("framework", framework).Str("task", task).Msg("Signature Task not found") + found = false + } else { + l.Error().Msg("Could not retrieve task data from MongoDB.") + fmt.Print(err) + return false, err + } + } + + return found, nil +} + +func (record *SignatureTaskRecord) UpdateTask(nodeID primitive.ObjectID, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) { + + tasksCollection := mongoDB.GetCollection(types.SignaturesTaskCollection) + + opts := options.FindOneAndUpdate().SetUpsert(true) + task_filter := bson.D{{Key: "task_data.node_id", Value: nodeID}, {Key: "task_data.framework", Value: framework}, {Key: "task_data.task", Value: task}} + ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Update given struct + update := bson.D{{Key: "$set", Value: record}} + // Get collection and update + var found bool = true + err := tasksCollection.FindOneAndUpdate(ctxM, task_filter, update, opts).Decode(record) + if err != nil { + if err == mongo.ErrNoDocuments { + l.Warn().Str("node_id", nodeID.String()).Str("framework", framework).Str("task", task).Msg("Signature Task not found, creating one.") + found = false + } else { + l.Error().Str("node_id", nodeID.String()).Str("framework", framework).Str("task", task).Msg("Could not retrieve signature task data from MongoDB.") + return false, err + } + } + + return found, nil +} + func (record *SignatureTaskRecord) GetMinSamplesPerTask() uint32 { return SignatureMinSamplesPerTask } @@ -391,14 +718,23 @@ func (record *SignatureTaskRecord) GetLastSeen() time.Time { return record.TaskData.GetLastSeen() } +func (record *SignatureTaskRecord) GetLastHeight() int64 { + return record.TaskData.GetLastHeight() +} + func (record *SignatureTaskRecord) UpdateLastSeen(timeSample time.Time) (err error) { record.TaskData.UpdateLastSeen(timeSample) return nil } +func (record *SignatureTaskRecord) UpdateLastHeight(height int64) (err error) { + record.TaskData.UpdateLastHeight(height) + return nil +} + // Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer) -func (record *SignatureTaskRecord) stepIndex(step int, marker string) error { - return record.CircBuffer.StepIndex(step, marker) +func (record *SignatureTaskRecord) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { + return record.CircBuffer.StepIndex(step, marker, positive_step, l) } // Updates the indexes making them point to the initial and final samples in a given time window. @@ -412,14 +748,17 @@ func (record *SignatureTaskRecord) GetNumSamples() uint32 { } // insert a new signature into the circular buffer -func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data interface{}) (err error) { +func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data interface{}, l *zerolog.Logger) (err error) { // Assert data type dataOk, ok := data.(SignatureSample) if !ok { return fmt.Errorf("invalid sample data type") } + + l.Debug().Str("signature", dataOk.Signature).Int("ID", dataOk.ID).Msg("Inserting sample.") + // Increment the end - err = record.stepIndex(1, "end") + err = record.StepIndex(1, "end", true, l) // Save sample record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID diff --git a/apps/go/manager/types/activities.go b/apps/go/manager/types/activities.go index f309d11..de03826 100644 --- a/apps/go/manager/types/activities.go +++ b/apps/go/manager/types/activities.go @@ -13,8 +13,14 @@ type NodeData struct { Service string } +type BlockData struct { + Height int64 + BlocksPerSession int64 +} + type GetStakedResults struct { Nodes []NodeData + Block BlockData } //------------------------------------------------------------------------------ @@ -23,6 +29,7 @@ type GetStakedResults struct { type AnalyzeNodeParams struct { Node NodeData `json:"node"` + Block BlockData `json:"block"` Tests []TestsData `json:"tests"` } diff --git a/apps/go/manager/types/circular_buffer.go b/apps/go/manager/types/circular_buffer.go index 9b52562..8298c35 100644 --- a/apps/go/manager/types/circular_buffer.go +++ b/apps/go/manager/types/circular_buffer.go @@ -2,11 +2,15 @@ package types import ( "errors" + "math" "time" "github.com/rs/zerolog" ) +// A date used to mark a position in the buffer that was never used +var EpochStart = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC) + // Keep track of circular buffer start and end indexes type CircularIndexes struct { Start uint32 `bson:"cir_start"` @@ -21,33 +25,52 @@ type CircularBuffer struct { } // Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer) -func (buffer *CircularBuffer) StepIndex(step int, marker string) error { +func (buffer *CircularBuffer) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { + + l.Debug().Int("buffer.Indexes.End", int(buffer.Indexes.Start)).Int("buffer.Indexes.End", int(buffer.Indexes.End)).Msg("Circular indexes.") + // Get values var currValue uint32 - var limitValue uint32 if marker == "start" { currValue = buffer.Indexes.Start - limitValue = buffer.Indexes.End } else if marker == "end" { currValue = buffer.Indexes.End - limitValue = buffer.Indexes.Start } else { return errors.New("buffer: invalid marker designation") } // perform the step - nextVal := int(currValue) + step + var nextVal uint32 = 0 + if positive_step { + nextVal = currValue + step + } else { + nextVal = currValue - step + } // Check limits and assign value - currValue = buffer.BufferLimitCheck(nextVal, limitValue) + currValue, err := buffer.BufferLimitCheck(nextVal, l) + if err != nil { + return err + } // Update values if marker == "start" { buffer.Indexes.Start = currValue } else { + if (buffer.Indexes.Start == currValue) && (step > 0) { + // This means that the end of the buffer advanced into the start of + // the buffer, we must movethe buffer one position + buffer.StepIndex(1, "start", true, l) + } buffer.Indexes.End = currValue } - buffer.NumSamples = uint32(int(buffer.NumSamples) + step) + + // Calculate number of valid samples + validIdx, err := buffer.GetBufferValidIndexes(l) + if err != nil { + return err + } + buffer.NumSamples = uint32(len(validIdx)) return nil } @@ -61,7 +84,7 @@ func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logg for oldestAge >= maxAge { // Increment the start - err := buffer.StepIndex(1, "start") + err := buffer.StepIndex(1, "start", true, l) if err != nil { return err } @@ -77,19 +100,38 @@ func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logg return nil } -func (buffer *CircularBuffer) BufferLimitCheck(nextVal int, limitValue uint32) uint32 { +func (buffer *CircularBuffer) BufferLimitCheck(nextVal uint32, l *zerolog.Logger) (uint32, error) { // Check for overflow - if nextVal >= int(buffer.CircBufferLen) { + if nextVal >= buffer.CircBufferLen { nextVal = 0 - } else if nextVal <= 0 { + } else if nextVal == math.MaxInt32 { // Check for underflow - nextVal = int(buffer.CircBufferLen - 1) + nextVal = buffer.CircBufferLen - 1 } - // Check for limit - if nextVal >= int(limitValue) { - nextVal = int(limitValue) - } + return uint32(nextVal), nil +} + +func (buffer *CircularBuffer) GetBufferValidIndexes(l *zerolog.Logger) (auxIdx []uint32, err error) { - return uint32(nextVal) + idxNow := buffer.Indexes.Start + for true { + // If the sample never written, we should ignore it + if buffer.Times[idxNow] != EpochStart { + // Add sample to data array + auxIdx = append(auxIdx, idxNow) + } + // run until we complete the circular buffer + if idxNow == buffer.Indexes.End { + break + } + // perform the step + nextVal := idxNow + 1 + // Check limits and assign value + idxNow, err = buffer.BufferLimitCheck(nextVal, l) + if err != nil { + return auxIdx, err + } + } + return auxIdx, err } diff --git a/apps/go/manager/types/config.go b/apps/go/manager/types/config.go index d471ef5..8090b59 100644 --- a/apps/go/manager/types/config.go +++ b/apps/go/manager/types/config.go @@ -234,9 +234,9 @@ type Config struct { Temporal *TemporalConfig `json:"temporal"` } -// This config structure holds, for a given framework, the specifics of its task -// Initially this will only determine the type of task: Numerical, Signature, etc type FrameworkConfig struct { TasksTypes map[string]string `json:"task_types"` TasksDependency map[string]string `json:"task_dependency"` + ScheduleLimits map[string]string `json:"schedule_limits"` + TriggerMinimum map[string]string `json:"trigger_minimum"` } diff --git a/apps/go/manager/types/mongodb.go b/apps/go/manager/types/mongodb.go index 3a90a63..028aa22 100644 --- a/apps/go/manager/types/mongodb.go +++ b/apps/go/manager/types/mongodb.go @@ -1,8 +1,12 @@ package types var ( - TaskCollection = "tasks" - InstanceCollection = "instances" - NodesCollection = "nodes" - ResultsCollection = "results" + TaskCollection = "tasks" + InstanceCollection = "instances" + PromptsCollection = "prompts" + ResponsesCollection = "responses" + NodesCollection = "nodes" + ResultsCollection = "results" + NumericalTaskCollection = "buffers_numerical" + SignaturesTaskCollection = "buffers_signatures" ) diff --git a/apps/go/manager/workflows/node_manager.go b/apps/go/manager/workflows/node_manager.go index 3cec6c8..10318a0 100644 --- a/apps/go/manager/workflows/node_manager.go +++ b/apps/go/manager/workflows/node_manager.go @@ -48,9 +48,9 @@ func (wCtx *Ctx) NodeManager(ctx workflow.Context, params types.NodeManagerParam Service: params.Service, } // Results will be kept logged by temporal - var stakedNodes types.GetStakedResults + var pocketNetworkData types.GetStakedResults // Execute activity - err := workflow.ExecuteActivity(ctxTimeout, activities.GetStakedName, getStakedInput).Get(ctx, &stakedNodes) + err := workflow.ExecuteActivity(ctxTimeout, activities.GetStakedName, getStakedInput).Get(ctx, &pocketNetworkData) if err != nil { return &result, err } @@ -67,7 +67,7 @@ func (wCtx *Ctx) NodeManager(ctx workflow.Context, params types.NodeManagerParam selector := workflow.NewSelector(ctx) // The channel requests are the nodes data - nodes := stakedNodes.Nodes + nodes := pocketNetworkData.Nodes // Define a channel to store NodeAnalysisChanResponse objects nodeAnalysisResultsChan := make(chan types.NodeAnalysisChanResponse, len(nodes)) // defer close lookup task results channel @@ -76,6 +76,7 @@ func (wCtx *Ctx) NodeManager(ctx workflow.Context, params types.NodeManagerParam for _, node := range nodes { input := types.AnalyzeNodeParams{ Node: node, + Block: pocketNetworkData.Block, Tests: params.Tests, } ltr := types.AnalyzeNodeResults{} diff --git a/apps/go/manager/x/app.go b/apps/go/manager/x/app.go index 8057abb..01623db 100644 --- a/apps/go/manager/x/app.go +++ b/apps/go/manager/x/app.go @@ -77,6 +77,10 @@ func Initialize() *types.App { types.InstanceCollection, types.NodesCollection, types.ResultsCollection, + types.PromptsCollection, + types.ResponsesCollection, + types.NumericalTaskCollection, + types.SignaturesTaskCollection, }, l) // Initialize connection to RPC diff --git a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py index 5576c5d..e6ee087 100644 --- a/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py +++ b/apps/python/evaluator/activities/signatures/tokenizer_evaluate.py @@ -15,6 +15,7 @@ PocketNetworkMongoDBResultSignature, PocketNetworkMongoDBTokenizer, SignatureSample, + PocketNetworkMongoDBResultBase ) @@ -58,12 +59,15 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: ) # Create the result, empty for now - result = PocketNetworkMongoDBResultSignature(task_id=args.task_id, - status=responses[0]["response"]["error_code"], - num_samples=0, - result_height=responses[0]["response"]["height"], - result_time=datetime.today().isoformat(), - signatures=[]) + result = PocketNetworkMongoDBResultSignature( + result_data = PocketNetworkMongoDBResultBase( + task_id=args.task_id, + status=responses[0]["response"]["error_code"], + num_samples=0, + result_height=responses[0]["response"]["height"], + result_time=datetime.today().isoformat(), + ), + signatures=[]) @@ -125,8 +129,8 @@ async def tokenizer_evaluate(args: PocketNetworkEvaluationTaskRequest) -> bool: raise ApplicationError("Failed to save tokenizer to MongoDB.", non_retryable=True) # Update the result with valid data - result.num_samples = 1 # Always one - result.status = 0 # OK + result.result_data.num_samples = 1 # Always one + result.result_data.status = 0 # OK result.signatures = [ SignatureSample(signature=str(tokenizer_mongo_new.hash), id=0) # This task has a single sample id ] diff --git a/docker-compose/dev/dependencies/mongodb/init-db.js b/docker-compose/dev/dependencies/mongodb/init-db.js index 6580cea..93cb4ef 100644 --- a/docker-compose/dev/dependencies/mongodb/init-db.js +++ b/docker-compose/dev/dependencies/mongodb/init-db.js @@ -23,4 +23,10 @@ db.responses.createIndex({task_id: 1, instance_id: 1, prompt_id: 1, ok: 1}); db.createCollection('nodes'); db.nodes.createIndex({address: 1, service: 1}, {unique: true}); -db.createCollection('results'); \ No newline at end of file +db.createCollection('results'); + +db.createCollection('buffers_numerical'); +db.buffers_numerical.createIndex({node_id: 1, framework: 1, task: 1}, {unique: true}); + +db.createCollection('buffers_signatures'); +db.buffers_signatures.createIndex({node_id: 1, framework: 1, task: 1}, {unique: true}); \ No newline at end of file diff --git a/docker-compose/morse-poc/README.md b/docker-compose/morse-poc/README.md index 9aa38b8..4e4f483 100644 --- a/docker-compose/morse-poc/README.md +++ b/docker-compose/morse-poc/README.md @@ -35,7 +35,7 @@ You can also host the model on other machines if you want (we wont guide you thr ```json [ { - "id": "00A1", + "id": "A100", "url": "http://SERVICE_IP:SERVICE_PORT" } ] diff --git a/docker-compose/morse-poc/apps_configs/manager.json b/docker-compose/morse-poc/apps_configs/manager.json index 24e1a0b..02e4366 100644 --- a/docker-compose/morse-poc/apps_configs/manager.json +++ b/docker-compose/morse-poc/apps_configs/manager.json @@ -30,15 +30,22 @@ "frameworks": { "lmeh" : { "task_types": {"any" : "numerical"}, - "task_dependency": {"any" : "signatures:tokenizer:ok"} + "task_dependency": {"any" : "signatures:tokenizer:ok"}, + "schedule_limits": {"any" : "none:none"}, + "trigger_minimum": {"any" : "0"} }, "helm" : { "task_types": {"any" : "numerical"}, - "task_dependency": {"any" : "signatures:tokenizer:ok"} + "task_dependency": {"any" : "signatures:tokenizer:ok"}, + "schedule_limits": {"any" : "none:none"}, + "trigger_minimum": {"any" : "0"} }, "signatures" : { "task_types": {"any" : "signature"}, - "task_dependency": {"any" : "none:none:none"} + "task_dependency": {"any" : "none:none:none"}, + "schedule_limits": {"any" : "1:session"}, + "trigger_minimum": {"tokenizer" : "1"} } } + } \ No newline at end of file diff --git a/docker-compose/morse-poc/apps_configs/requester.json b/docker-compose/morse-poc/apps_configs/requester.json index 254a805..cfdba18 100644 --- a/docker-compose/morse-poc/apps_configs/requester.json +++ b/docker-compose/morse-poc/apps_configs/requester.json @@ -13,7 +13,7 @@ "req_per_sec": 10, "session_tolerance": 1 }, - "log_level": "debug", + "log_level": "info", "temporal": { "host": "temporal", "port": 7233, diff --git a/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js b/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js index 14c6771..7a35be5 100644 --- a/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js +++ b/docker-compose/morse-poc/dependencies_configs/mongodb/init-db.js @@ -23,4 +23,10 @@ db.responses.createIndex({task_id: 1, instance_id: 1, prompt_id: 1, ok: 1}); db.createCollection('nodes'); db.nodes.createIndex({address: 1, service: 1}, {unique: true}); -db.createCollection('results'); \ No newline at end of file +db.createCollection('results'); + +db.createCollection('buffers_numerical'); +db.buffers_numerical.createIndex({"task_data.node_id": 1, "task_data.framework": 1, "task_data.task": 1}, {unique: true}); + +db.createCollection('buffers_signatures'); +db.buffers_numerical.createIndex({"task_data.node_id": 1, "task_data.framework": 1, "task_data.task": 1}, {unique: true}); \ No newline at end of file diff --git a/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json b/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json index 7b9f4b2..4f7e674 100644 --- a/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json +++ b/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json @@ -1,4 +1,4 @@ { - "log_level": "DEBUG", + "log_level": "INFO", "tokenizer_path": "/tokenizer" } \ No newline at end of file diff --git a/docker-compose/morse-poc/dependencies_configs/temporal/initialize.sh b/docker-compose/morse-poc/dependencies_configs/temporal/initialize.sh index eccf08f..72e94b0 100755 --- a/docker-compose/morse-poc/dependencies_configs/temporal/initialize.sh +++ b/docker-compose/morse-poc/dependencies_configs/temporal/initialize.sh @@ -9,7 +9,7 @@ mmlu="mmlu_abstract_algebra,mmlu_anatomy,mmlu_astronomy,mmlu_business_ethics,mml heavy="arc_challenge,hellaswag,truthfulqa_mc2,winogrande,gsm8k" one="mmlu_astronomy" # change this if you want a different set of datasets, by default it create everything -keys=$everything +keys=$one json_array=$(printf ',"%s"' "${key_array[@]}") json_array="[${json_array:1}]" @@ -33,8 +33,8 @@ sleep 60 for key in "${key_array[@]}"; do temporal schedule create \ - --schedule-id "lmeh-$key-00A1" \ - --workflow-id "lmeh-$key-00A1" \ + --schedule-id "lmeh-$key-A100" \ + --workflow-id "lmeh-$key-A100" \ --namespace 'pocket-ml-testbench' \ --workflow-type 'Manager' \ --task-queue 'manager' \ @@ -42,12 +42,12 @@ for key in "${key_array[@]}"; do --execution-timeout 350 \ --task-timeout 175 \ --overlap-policy 'BufferOne' \ - --input "{\"service\":\"00A1\", \"tests\": [{\"framework\": \"lmeh\", \"tasks\": [\"$key\"]}]}" + --input "{\"service\":\"A100\", \"tests\": [{\"framework\": \"lmeh\", \"tasks\": [\"$key\"]}]}" done temporal schedule create \ - --schedule-id "lmeh-tokenizer-00A1" \ - --workflow-id "lmeh-tokenizer-00A1" \ + --schedule-id "lmeh-tokenizer-A100" \ + --workflow-id "lmeh-tokenizer-A100" \ --namespace 'pocket-ml-testbench' \ --workflow-type 'Manager' \ --task-queue 'manager' \ @@ -55,15 +55,15 @@ temporal schedule create \ --execution-timeout 350 \ --task-timeout 175 \ --overlap-policy 'BufferOne' \ - --input "{\"service\":\"00A1\", \"tests\": [{\"framework\": \"signatures\", \"tasks\": [\"tokenizer\"]}]}" + --input "{\"service\":\"A100\", \"tests\": [{\"framework\": \"signatures\", \"tasks\": [\"tokenizer\"]}]}" temporal schedule create \ - --schedule-id 'f3abbe313689a603a1a6d6a43330d0440a552288-00A1' \ - --workflow-id 'f3abbe313689a603a1a6d6a43330d0440a552288-00A1' \ + --schedule-id 'f3abbe313689a603a1a6d6a43330d0440a552288-A100' \ + --workflow-id 'f3abbe313689a603a1a6d6a43330d0440a552288-A100' \ --namespace 'pocket-ml-testbench' \ --workflow-type 'Requester' \ --task-queue 'requester' \ --cron '@every 1m' \ --execution-timeout 350 \ --task-timeout 175 \ - --input '{"app":"f3abbe313689a603a1a6d6a43330d0440a552288","service":"00A1"}' \ No newline at end of file + --input '{"app":"f3abbe313689a603a1a6d6a43330d0440a552288","service":"A100"}' \ No newline at end of file diff --git a/docker-compose/morse-poc/pocket_configs/config/chains.json b/docker-compose/morse-poc/pocket_configs/config/chains.json index fd6cf63..5fe631b 100644 --- a/docker-compose/morse-poc/pocket_configs/config/chains.json +++ b/docker-compose/morse-poc/pocket_configs/config/chains.json @@ -1,6 +1,6 @@ [ { - "id": "00A1", + "id": "A100", "url": "http://nginx-sidecar:9087" } ] \ No newline at end of file diff --git a/docker-compose/morse-poc/pocket_configs/config/genesis.json b/docker-compose/morse-poc/pocket_configs/config/genesis.json index 916eb20..c20e811 100644 --- a/docker-compose/morse-poc/pocket_configs/config/genesis.json +++ b/docker-compose/morse-poc/pocket_configs/config/genesis.json @@ -23,7 +23,7 @@ "session_node_count": "24", "proof_waiting_period": "2", "supported_blockchains": [ - "00A1" + "A100" ], "claim_expiration": "12", "replay_attack_burn_multiplier": "3", @@ -46,7 +46,7 @@ { "address": "f3abbe313689a603a1a6d6a43330d0440a552288", "chains": [ - "00A1" + "A100" ], "jailed": false, "max_relays": "10000000", @@ -533,7 +533,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -545,7 +545,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -557,7 +557,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -569,7 +569,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -581,7 +581,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -593,7 +593,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -605,7 +605,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -617,7 +617,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -629,7 +629,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -641,7 +641,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -653,7 +653,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -665,7 +665,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -677,7 +677,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -689,7 +689,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -701,7 +701,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -713,7 +713,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -725,7 +725,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -737,7 +737,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -749,7 +749,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -761,7 +761,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -773,7 +773,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -785,7 +785,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -797,7 +797,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" }, @@ -809,7 +809,7 @@ "tokens": "60000000000", "service_url": "http://mesh.dev:9081", "chains": [ - "00A1" + "A100" ], "unstaking_time": "0001-01-01T00:00:00Z" } diff --git a/packages/go/mongodb/collection.go b/packages/go/mongodb/collection.go index aa01bf1..f0b73f1 100644 --- a/packages/go/mongodb/collection.go +++ b/packages/go/mongodb/collection.go @@ -17,6 +17,7 @@ type CollectionAPI interface { InsertMany(ctx context.Context, documents []interface{}, opts ...*options.InsertManyOptions) (*mongo.InsertManyResult, error) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) + DeleteMany(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error) } @@ -63,3 +64,7 @@ func (c *Collection) UpdateOne(ctx context.Context, filter interface{}, update i func (c *Collection) DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) { return c.collection.DeleteOne(ctx, filter, opts...) } + +func (c *Collection) DeleteMany(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) { + return c.collection.DeleteMany(ctx, filter, opts...) +} diff --git a/packages/go/pocket_rpc/pocket_rpc.go b/packages/go/pocket_rpc/pocket_rpc.go index a2b3256..ef78e34 100644 --- a/packages/go/pocket_rpc/pocket_rpc.go +++ b/packages/go/pocket_rpc/pocket_rpc.go @@ -5,14 +5,15 @@ import ( "context" "encoding/json" "errors" - poktGoSdk "github.com/pokt-foundation/pocket-go/provider" - poktGoUtils "github.com/pokt-foundation/pocket-go/utils" "io" "net/http" "packages/pocket_rpc/common" "packages/pocket_rpc/types" "packages/utils" "time" + + poktGoSdk "github.com/pokt-foundation/pocket-go/provider" + poktGoUtils "github.com/pokt-foundation/pocket-go/utils" ) type PocketRpc struct { @@ -252,6 +253,9 @@ func (rpc *PocketRpc) getNodesByPage(service string, page int, pageSize int, ch } func (rpc *PocketRpc) GetNodes(service string) (nodes []*poktGoSdk.Node, e error) { + + // TODO : Handle case where no nodes are staked in service + if e := common.ServiceIdentifierVerification(service); e != nil { return nil, ErrBadRequestParams } diff --git a/packages/python/lmeh/pocket_lm_eval/api/task.py b/packages/python/lmeh/pocket_lm_eval/api/task.py index 975ba74..c559b70 100644 --- a/packages/python/lmeh/pocket_lm_eval/api/task.py +++ b/packages/python/lmeh/pocket_lm_eval/api/task.py @@ -654,21 +654,24 @@ def generate_random_doc_ids(self, table_name: str, _split: str, qty: int, min: i "Quantity of numbers to generate is greater than the range", non_retryable=True ) - # Generate a list of random numbers within the range [min, max] excluding the blacklist + + # Generate a list of random numbers within the range [min, max] ints = set(range(min, max + 1)) - if len(blacklist) > 0: - original_len = len(ints) - # Remove the blacklisted numbers - ints = ints - set(blacklist) - # Check that the blacklist numbers were removed - if len(ints) == original_len: - self.eval_logger.error("Blacklist out of range:", table_name=table_name, _split=_split, range_min=min, - range_max=max, blacklist=blacklist) - raise ApplicationError( - "Blacklist corresponding to '{}' table & '{}' split were not founded in the range: [{}-{}]".format( - table_name, _split, min, max), - non_retryable=True - ) + if blacklist is not None: + # exclude the blacklist members + if len(blacklist) > 0: + original_len = len(ints) + # Remove the blacklisted numbers + ints = ints - set(blacklist) + # Check that the blacklist numbers were removed + if len(ints) == original_len: + self.eval_logger.error(f"Blacklist out of range:", table_name=table_name, _split=_split, range_min=min, + range_max=max, blacklist=blacklist) + raise ApplicationError( + "Blacklist corresponding to '{}' table & '{}' split were not founded in the range: [{}-{}]".format( + table_name, _split, min, max), + non_retryable=True + ) # sorted random numbers choices = sorted(np.random.choice(list(ints), qty, replace=False).tolist()) self.eval_logger.debug("Random numbers generated:", choices=choices) @@ -861,4 +864,4 @@ async def build_all_requests( for i, b in enumerate(self.config.metadata['pocket_args'].doc_ids): b_dict[b] = i a_indices = [b_dict[a] for a in kept_doc_ids] - self.dataset[self.eval_split] = Dataset.from_dict(self.dataset[self.eval_split][a_indices]) \ No newline at end of file + self.dataset[self.eval_split] = Dataset.from_dict(self.dataset[self.eval_split][a_indices]) diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index 7c8dd69..dc82833 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -23,7 +23,7 @@ from packages.python.lmeh.utils.mongodb import MongoOperator from packages.python.lmeh.pocket_lm_eval.tasks import PocketNetworkTaskManager from packages.python.protocol.protocol import PocketNetworkTaskRequest, PocketNetworkMongoDBTask, \ - PocketNetworkMongoDBPrompt, NumericSample, PocketNetworkMongoDBResultNumerical + PocketNetworkMongoDBPrompt, NumericSample, PocketNetworkMongoDBResultNumerical, PocketNetworkMongoDBResultBase from motor.motor_asyncio import AsyncIOMotorClient from packages.python.common.mongodb import MongoClient from bson import ObjectId @@ -404,12 +404,15 @@ async def save_results( if len(task.instances) == 0: insert_mongo_results = [] eval_logger.debug("No instances/doc_id generated for task.", task_id=str(task_id)) - num_result = PocketNetworkMongoDBResultNumerical( - task_id=task_id, - num_samples=0, - status=1, + base_result = PocketNetworkMongoDBResultBase( + task_id=task_id, + status=1, + num_samples=0, result_height=task.result_height, result_time=datetime.today().isoformat(), + ) + num_result = PocketNetworkMongoDBResultNumerical( + result_data=base_result, scores=[]) insert_mongo_results.append(num_result.model_dump(by_alias=True)) await save_results( @@ -491,12 +494,15 @@ async def save_results( numericSample = NumericSample(score=example[selected_metrics], id=doc_id) scores.append(numericSample) + base_result = PocketNetworkMongoDBResultBase( + task_id=task_id, + status=0, + num_samples=len(result_num_samples), + result_height=task.result_height, + result_time=datetime.today().isoformat(), + ) num_result = PocketNetworkMongoDBResultNumerical( - task_id=task_id, - num_samples=len(result_num_samples), - status=0, - result_height=task.result_height, - result_time=datetime.today().isoformat(), + result_data=base_result, scores=scores) insert_mongo_results.append(num_result.model_dump(by_alias=True)) eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results) diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index 6fe6cd5..df901ba 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -32,6 +32,9 @@ def __init__(self, client: MongoClient, collections_map=None): self.instances_collection = collections_map["instances"] if "instances" in collections_map else "instances" self.prompts_collection = collections_map["prompts"] if "prompts" in collections_map else "prompts" self.responses_collection = collections_map["responses"] if "responses" in collections_map else "responses" + self.buffers_numerical_collection = collections_map["buffers_numerical"] if "buffers_numerical" in collections_map else "buffers_numerical" + self.buffers_signatures_collection = collections_map["buffers_signatures"] if "buffers_signatures" in collections_map else "buffers_signatures" + # TODO : This should reffer to PocketNetworkMongoDBInstance and not depend on LMEH blindly @staticmethod @@ -54,15 +57,28 @@ async def get_tokenizer_hash(self, address: str, service: str) -> str: eval_logger.debug("Node found.", node=node) - # Check if tokenizer signature exists - if node.get('signature_tasks', None) is None: - eval_logger.error("Node address has no signature_tasks, cannot load tokenizer hash.", adress=address) - raise ApplicationError(f"Node address {address}, has no signature_tasks cannot load tokenizer hash.") + # Get the node ID + if node.get('_id', None) is None: + eval_logger.error("Node address has no _id, cannot load tokenizer hash.", adress=address) + raise ApplicationError(f"Node address {address}, has no _id, cannot load tokenizer hash.") + + # Get the corresponding signature buffer + buffer = await self.client.db[self.buffers_signatures_collection].find_one({'task_data.node_id': node['_id'], + 'task_data.framework': "signatures", + 'task_data.task': "tokenizer" + }) + + if buffer is None: + eval_logger.error("Buffer for tokenizer signature not found.", adress=address) + raise ApplicationError(f"Node address {address} does not have a tokenizer signature buffer associated.") + + eval_logger.debug("Tokennizer signature buffer found.", buffer=buffer) + - tokenizer_hash = '' - for task in node['signature_tasks']: - if (task['task_data']['framework'] == 'signatures') and (task['task_data']['task'] == 'tokenizer'): - tokenizer_hash = task['last_signature'] + tokenizer_hash = buffer.get('last_signature', None) + if tokenizer_hash is None: + eval_logger.error("Buffer has no last signature field, entry is malformed cannot procede.", adress=address) + raise ApplicationError(f"Node address {address} buffer has no last signature field, entry is malformed cannot procede.") return tokenizer_hash diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 2081812..1dc6dda 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -236,20 +236,30 @@ class PocketNetworkMongoDBResultBase(BaseModel): class Config: arbitrary_types_allowed = True + + class SignatureSample(BaseModel): signature: str id: int -class PocketNetworkMongoDBResultSignature(PocketNetworkMongoDBResultBase): - signatures: List[SignatureSample] +class PocketNetworkMongoDBResultSignature(BaseModel): + id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") + result_data: PocketNetworkMongoDBResultBase + signatures: List[SignatureSample] + class Config: + arbitrary_types_allowed = True class NumericSample(BaseModel): score: float id: int -class PocketNetworkMongoDBResultNumerical(PocketNetworkMongoDBResultBase): - scores: List[NumericSample] +class PocketNetworkMongoDBResultNumerical(BaseModel): + id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") + result_data: PocketNetworkMongoDBResultBase + scores: List[NumericSample] + class Config: + arbitrary_types_allowed = True ########### @@ -262,4 +272,4 @@ class PocketNetworkMongoDBTokenizer(BaseModel): hash : str class Config: - arbitrary_types_allowed = True \ No newline at end of file + arbitrary_types_allowed = True