Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

73 manager sepparate node data update from task triggering #96

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 80 additions & 177 deletions apps/go/manager/activities/analyze_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams

// Get logger
l := aCtx.App.Logger
l.Debug().Str("address", params.Node.Address).Str("service", params.Node.Service).Msg("Analyzing staked node.")
l.Debug().
Str("address", params.Node.Address).
Str("service", params.Node.Service).
Msg("Analyzing staked node.")

// Retrieve this node entry
var thisNodeData records.NodeRecord
Expand Down Expand Up @@ -65,16 +68,27 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams
for _, test := range params.Tests {

for _, task := range test.Tasks {
l.Debug().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Checking task requests.")
l.Debug().
Str("address", thisNodeData.Address).
Str("service", thisNodeData.Service).
Str("framework", test.Framework).
Str("task", task).
Msg("Checking task requests.")

// Check task dependencies
depStatus, err := records.CheckTaskDependency(&thisNodeData, test.Framework, task, aCtx.App.Config.Frameworks, aCtx.App.Mongodb, l)
if err != nil {
l.Error().Msg("Could not check task dependencies.")
l.Error().
Msg("Could not check task dependencies.")
return nil, err
}
if !depStatus {
l.Info().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Does not meet task dependencies, ignoring for now.")
l.Info().
Str("address", thisNodeData.Address).
Str("service", thisNodeData.Service).
Str("framework", test.Framework).
Str("task", task).
Msg("Does not meet task dependencies, ignoring for now.")
continue
}

Expand All @@ -85,7 +99,12 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams
}
thisTaskRecord, found := records.GetTaskData(thisNodeData.ID, taskType, test.Framework, task, aCtx.App.Mongodb, l)
if found != true {
l.Error().Str("address", thisNodeData.Address).Str("service", thisNodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("not found task entry after check creation (task should be present at this point)")
l.Error().
Str("address", thisNodeData.Address).
Str("service", thisNodeData.Service).
Str("framework", test.Framework).
Str("task", task).
Msg("not found task entry after check creation (task should be present at this point)")
return nil, fmt.Errorf("not found task entry after check creation (task should be present at this point)")
}

Expand Down Expand Up @@ -161,11 +180,12 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams
return &result, nil
}

// Checks status of all node's tasks, drops old ones, checks for new results and re-computes.
func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) (err error) {

// Get results collection
resultsCollection := mongoDB.GetCollection(types.ResultsCollection)
// Checks for node's tasks records and drops old ones.
func updateTasksNode(nodeData *records.NodeRecord,
tests []types.TestsData,
frameworkConfigMap map[string]types.FrameworkConfig,
mongoDB mongodb.MongoDb,
l *zerolog.Logger) (err error) {

//--------------------------------------------------------------------------
// Check for each task sample date
Expand All @@ -174,7 +194,12 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram

for _, task := range test.Tasks {

l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Updating circular buffer.")
l.Debug().
Str("address", nodeData.Address).
Str("service", nodeData.Service).
Str("framework", test.Framework).
Str("task", task).
Msg("Updating circular buffer.")

//------------------------------------------------------------------
// Get stored data for this task
Expand All @@ -186,7 +211,12 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram
thisTaskRecord, found := records.GetTaskData(nodeData.ID, taskType, test.Framework, task, mongoDB, l)

if !found {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Not found, creating.")
l.Debug().
Str("address", nodeData.Address).
Str("service", nodeData.Service).
Str("framework", test.Framework).
Str("task", task).
Msg("Not found, creating.")
defaultDate := time.Now()
thisTaskRecord = nodeData.AppendTask(nodeData.ID, test.Framework, task, defaultDate, frameworkConfigMap, mongoDB, l)
}
Expand All @@ -195,76 +225,19 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram
// Drop old samples (move indices).
//------------------------------------------------------------------

err = thisTaskRecord.CycleIndexes(l)
if err != nil {
return err
}

//------------------------------------------------------------------
// Check pending and done tasks in the database
//------------------------------------------------------------------
_, tasksDone, _, tasksIDs, err := checkTaskDatabase(nodeData.Address, nodeData.Service, test.Framework, task, mongoDB, l)
cycled, err := thisTaskRecord.CycleIndexes(l)
if err != nil {
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Msg("Cannot check Tasks Database.")
return err
}

//------------------------------------------------------------------
// Read new results from MongoDB Results and add to buffer
//------------------------------------------------------------------
if tasksDone > 0 {

// loop over all tasksIDs
for _, taskID := range tasksIDs {
thisTaskResults := thisTaskRecord.GetResultStruct()
found = false
found, err = thisTaskResults.FindAndLoadResults(taskID,
resultsCollection,
l)
if err != nil {
return err
}
if found == true {

l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Processing found results.")

// If nothing is wrong with the result calculation
if thisTaskResults.GetStatus() == 0 {
l.Debug().Int("NumSamples", int(thisTaskResults.GetNumSamples())).Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Inserting results into buffers.")
// Add results to current task record
for i := 0; i < int(thisTaskResults.GetNumSamples()); i++ {
thisTaskRecord.InsertSample(time.Now(), thisTaskResults.GetSample(i), l)
}
// Update the last seen fields
thisTaskRecord.UpdateLastHeight(thisTaskResults.GetResultHeight())
thisTaskRecord.UpdateLastSeen(thisTaskResults.GetResultTime())
} else {
// TODO: handle status!=0
l.Debug().Str("address", nodeData.Address).Str("service", nodeData.Service).Str("framework", test.Framework).Str("task", task).Str("task_id", taskID.String()).Msg("Status not zero.")
}

// Delete all MongoDB entries associated with this task ID
errDel := RemoveTaskID(taskID, mongoDB, l)
if errDel != nil {
l.Debug().Str("delete_error", errDel.Error()).Str("task_id", taskID.String()).Msg("Deletion error.")
}

}
}

}

//------------------------------------------------------------------
// Calculate new metrics for this task
//------------------------------------------------------------------
thisTaskRecord.ProcessData(l)

//------------------------------------------------------------------
// Update task in DB
//------------------------------------------------------------------
_, err = thisTaskRecord.UpdateTask(nodeData.ID, test.Framework, task, mongoDB, l)
if err != nil {
return err
if cycled || found {
_, err = thisTaskRecord.UpdateTask(nodeData.ID, test.Framework, task, mongoDB, l)
if err != nil {
return err
}
}

}
Expand All @@ -274,8 +247,17 @@ func updateTasksNode(nodeData *records.NodeRecord, tests []types.TestsData, fram
return err
}

// Looks for a framework-task-node in the TaskDB and retreives all the IDs adn tasks status
func checkTaskDatabase(address string, service string, framework string, task string, mongoDB mongodb.MongoDb, l *zerolog.Logger) (tasksInQueue uint32, tasksDone uint32, blackList []int, tasksIDs []primitive.ObjectID, err error) {
// Looks for a framework-task-node in the TaskDB and retrieves all the IDs and tasks status
func checkTaskDatabase(address string,
service string,
framework string,
task string,
mongoDB mongodb.MongoDb,
l *zerolog.Logger) (tasksInQueue uint32,
tasksDone uint32,
blackList []int,
tasksIDs []primitive.ObjectID,
err error) {
// define blacklist as length zero
blackList = make([]int, 0)

Expand Down Expand Up @@ -311,7 +293,12 @@ func checkTaskDatabase(address string, service string, framework string, task st
tasksIDs = append(tasksIDs, taskReq.Id)
// If not already done
if !taskReq.Done {
l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Msg("Found pending task.")
l.Debug().
Str("address", address).
Str("service", service).
Str("framework", framework).
Str("task", task).
Msg("Found pending task.")
// Count pending
tasksInQueue += uint32(taskReq.Qty)

Expand All @@ -335,111 +322,27 @@ func checkTaskDatabase(address string, service string, framework string, task st
}
}
} else {
l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Msg("Found done task.")
l.Debug().
Str("address", address).
Str("service", service).
Str("framework", framework).
Str("task", task).
Msg("Found done task.")
tasksDone += 1
}

}

l.Debug().Str("address", address).Str("service", service).Str("framework", framework).Str("task", task).Int32("tasksDone", int32(tasksDone)).Int32("tasksInQueue", int32(tasksInQueue)).Int("tasksIDsLen", len(tasksIDs)).Msg("Pending tasks analized.")
l.Debug().
Str("address", address).
Str("service", service).
Str("framework", framework).
Str("task", task).
Int32("tasksDone", int32(tasksDone)).
Int32("tasksInQueue", int32(tasksInQueue)).
Int("tasksIDsLen", len(tasksIDs)).
Msg("Pending tasks analyzed.")

return tasksInQueue, tasksDone, blackList, tasksIDs, err

}

// Given a TaskID from MongoDB, deletes all associated entries from the "tasks", "instances", "prompts", "responses" and "results" collections.
func RemoveTaskID(taskID primitive.ObjectID, mongoDB mongodb.MongoDb, l *zerolog.Logger) (err error) {

//--------------------------------------------------------------------------
//-------------------------- Instances -------------------------------------
//--------------------------------------------------------------------------
instancesCollection := mongoDB.GetCollection(types.InstanceCollection)
// Set filtering for this node-service pair data
task_request_filter := bson.D{{Key: "task_id", Value: taskID}}
// Set mongo context
ctxM, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Now retrieve all node task requests entries
response, err := instancesCollection.DeleteMany(ctxM, task_request_filter)
if err != nil {
l.Warn().Msg("Could not delete instances data from MongoDB.")
return err
}

l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted instances data from MongoDB")

//--------------------------------------------------------------------------
//-------------------------- Prompts ---------------------------------------
//--------------------------------------------------------------------------
promptsCollection := mongoDB.GetCollection(types.PromptsCollection)
// Set filtering for this node-service pair data
task_request_filter = bson.D{{Key: "task_id", Value: taskID}}
// Set mongo context
ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Now retrieve all node task requests entries
response, err = promptsCollection.DeleteMany(ctxM, task_request_filter)
if err != nil {
l.Warn().Msg("Could not delete prompts data from MongoDB.")
return err
}

l.Debug().Int("deleted", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted prompts data from MongoDB")

//--------------------------------------------------------------------------
//-------------------------- Responses -------------------------------------
//--------------------------------------------------------------------------
responsesCollection := mongoDB.GetCollection(types.ResponsesCollection)
// Set filtering for this node-service pair data
task_request_filter = bson.D{{Key: "task_id", Value: taskID}}
// Set mongo context
ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Now retrieve all node task requests entries
response, err = responsesCollection.DeleteMany(ctxM, task_request_filter)
if err != nil {
l.Warn().Msg("Could not delete responses data from MongoDB.")
return err
}

l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted responses data from MongoDB")

//--------------------------------------------------------------------------
//-------------------------- Results ---------------------------------------
//--------------------------------------------------------------------------
resultsCollection := mongoDB.GetCollection(types.ResultsCollection)
// Set filtering for this node-service pair data
task_request_filter = bson.D{{Key: "result_data.task_id", Value: taskID}}
// Set mongo context
ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Now retrieve all node task requests entries
response, err = resultsCollection.DeleteMany(ctxM, task_request_filter)
if err != nil {
l.Warn().Msg("Could not delete results data from MongoDB.")
return err
}

l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted results data from MongoDB")

//--------------------------------------------------------------------------
//-------------------------- Task ------------------------------------------
//--------------------------------------------------------------------------
tasksCollection := mongoDB.GetCollection(types.TaskCollection)
// Set filtering for this node-service pair data
task_request_filter = bson.D{{Key: "_id", Value: taskID}}
// Set mongo context
ctxM, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Now retrieve all node task requests entries
response, err = tasksCollection.DeleteMany(ctxM, task_request_filter)
if err != nil {
l.Warn().Msg("Could not delete task data from MongoDB.")
return err
}

l.Debug().Int("deleted_count", int(response.DeletedCount)).Str("TaskID", taskID.String()).Msg("deleted task data from MongoDB")

return nil

}
5 changes: 5 additions & 0 deletions apps/go/manager/activities/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ func (aCtx *Ctx) Register(w worker.Worker) {
w.RegisterActivityWithOptions(aCtx.TriggerSampler, activity.RegisterOptions{
Name: TriggerSamplerName,
})

w.RegisterActivityWithOptions(aCtx.AnalyzeResult, activity.RegisterOptions{
Name: AnalyzeResultName,
})

}
Loading
Loading