Skip to content

Commit

Permalink
64 manager add same trigger restrictions based on blocks 1159 (#67)
Browse files Browse the repository at this point in the history
* - trigger restriction working
- signatures working
- tests working, ready to rebase incoming from main

* all elementes working withouth errors (on green path)
  • Loading branch information
RawthiL authored Jun 17, 2024
1 parent 0ab0fbb commit 31d5687
Show file tree
Hide file tree
Showing 29 changed files with 1,041 additions and 466 deletions.
400 changes: 291 additions & 109 deletions apps/go/manager/activities/analyze_node.go

Large diffs are not rendered by default.

31 changes: 26 additions & 5 deletions apps/go/manager/activities/get_staked.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package activities
import (
"context"
"manager/types"
"strconv"
)

var GetStakedName = "get_staked"
Expand Down Expand Up @@ -40,11 +41,31 @@ func (aCtx *Ctx) GetStaked(ctx context.Context, params types.GetStakedParams) (*
l.Info().Int("nodes_staked", len(result.Nodes)).Msg("Successfully pulled staked node-services.")
}

// // cheap mock
// for i := 0; i < 5; i++ {
// thisNode := NodeData{Address: fmt.Sprint(i), Service: fmt.Sprint(i * 10)}
// result.Nodes = append(result.Nodes, thisNode)
// }
// Get block data
currHeight, err := aCtx.App.PocketRpc.GetHeight()
if err != nil {
l.Error().Str("service", params.Service).Msg("Could not retrieve latest block hieght.")
return nil, err
}
blockParams, err := aCtx.App.PocketRpc.GetAllParams(currHeight)
if err != nil {
l.Error().Str("service", params.Service).Msg("Could not retrieve block params.")
return nil, err
}
blocksPerSession, ok := blockParams.NodeParams.Get("pos/BlocksPerSession")
if !ok {
l.Error().Str("service", params.Service).Msg("Cannot get blocks per session parameter.")
return nil, err
}

result.Block.Height = currHeight
i64, err := strconv.ParseInt(blocksPerSession, 10, 64)
if err != nil {
l.Error().Str("service", params.Service).Msg("Could convert parameter to number.")
return nil, err
}

result.Block.BlocksPerSession = i64

return &result, nil
}
4 changes: 2 additions & 2 deletions apps/go/manager/activities/trigger_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (aCtx *Ctx) TriggerSampler(_ context.Context, params types.TriggerSamplerPa
Blacklist: params.Trigger.Blacklist,
Qty: params.Trigger.Qty,
}
evaluatorWorkflowOptions := client.StartWorkflowOptions{
samplerWorkflowOptions := client.StartWorkflowOptions{
TaskQueue: aCtx.App.Config.Temporal.Sampler.TaskQueue,
WorkflowExecutionErrorWhenAlreadyStarted: true,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
Expand All @@ -42,7 +42,7 @@ func (aCtx *Ctx) TriggerSampler(_ context.Context, params types.TriggerSamplerPa
// Do not wait for a result by not calling .Get() on the returned future
_, err := aCtx.App.TemporalClient.ExecuteWorkflow(
context.Background(),
evaluatorWorkflowOptions,
samplerWorkflowOptions,
aCtx.App.Config.Temporal.Sampler.WorkflowName,
samplerParams,
)
Expand Down
21 changes: 18 additions & 3 deletions apps/go/manager/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,24 @@
}
},
"frameworks": {
"lmeh" : {"task_types": {"any" : "numerical"}},
"helm" : {"task_types": {"any" : "numerical"}},
"signatures" : {"task_types": {"any" : "signature"}}
"lmeh" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"},
"schedule_limits": {"any" : "none:none"},
"trigger_minimum": {"any" : "0"}
},
"helm" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"},
"schedule_limits": {"any" : "none:none"},
"trigger_minimum": {"any" : "0"}
},
"signatures" : {
"task_types": {"any" : "signature"},
"task_dependency": {"any" : "none:none:none"},
"schedule_limits": {"any" : "1:session"},
"trigger_minimum": {"tokenizer" : "1"}
}
}

}
176 changes: 28 additions & 148 deletions apps/go/manager/records/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
Expand All @@ -21,84 +22,17 @@ import (
// DB entry of a given node-service pair
// The "Tasks" array will hold as many entries as tasks being tested
type NodeRecord struct {
Address string `bson:"address"`
Service string `bson:"service"`
LastSeenHeight uint32 `bson:"last_seen_height"`
LastSeenTime time.Time `bson:"last_seen_time"`
NumericalTasks []NumericalTaskRecord `bson:"numerical_tasks"`
SignatureTasks []SignatureTaskRecord `bson:"signature_tasks"`
ID primitive.ObjectID `bson:"_id,omitempty"`
Address string `bson:"address"`
Service string `bson:"service"`
LastSeenHeight int64 `bson:"last_seen_height"`
LastSeenTime time.Time `bson:"last_seen_time"`
}

// Creates and array of interfaces that contains all tasks
func (record *NodeRecord) CombineTasks() []TaskInterface {
combinedTasks := make([]TaskInterface, 0, len(record.NumericalTasks)+len(record.SignatureTasks))
func (record *NodeRecord) FindAndLoadNode(node types.NodeData, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) {

for _, element := range record.NumericalTasks {
combinedTasks = append(combinedTasks, &element)
}

for _, element := range record.SignatureTasks {
combinedTasks = append(combinedTasks, &element)
}

return combinedTasks
}

func (record *NodeRecord) GetPrunedTasks(taskArray []TaskInterface, maxAge time.Duration, l *zerolog.Logger) []TaskInterface {
// Indices to remove
var indicesToRemove []int
// For each task
for i, task := range taskArray {
// Check the date of the index end
oldestAge := time.Since(task.GetLastSeen())
// Check
if oldestAge >= maxAge {
// Add to remove list
indicesToRemove = append(indicesToRemove, i)

l.Info().Str("address", record.Address).Str("service", record.Service).Str("framework", task.GetFramework()).Str("task", task.GetTask()).Msg("Removing task due to old age.")
}
}

// Remove Tasks
for i := len(indicesToRemove) - 1; i >= 0; i-- {
index := indicesToRemove[i]
taskArray = append(taskArray[:index], taskArray[index+1:]...)
}

return taskArray
}

// Go through all task and remove the ones that have no new samples since the limit
func (record *NodeRecord) PruneTasks(l *zerolog.Logger) error {

// Maximum age of a task
maxAge := time.Duration(TaskTTLDays) * 24 * time.Hour

// Remove Numerical Tasks
var numTaskInterfaces []TaskInterface
for _, task := range record.NumericalTasks {
numTaskInterfaces = append(numTaskInterfaces, &task)
}
tasksPrunned := record.GetPrunedTasks(numTaskInterfaces, maxAge, l)
for i, task := range tasksPrunned {
record.NumericalTasks[i] = *(task.(*NumericalTaskRecord))
}

// Remove Signature Tasks
var signTaskInterfaces []TaskInterface
for _, task := range record.SignatureTasks {
signTaskInterfaces = append(signTaskInterfaces, &task)
}
tasksPrunned = record.GetPrunedTasks(signTaskInterfaces, maxAge, l)
for i, task := range tasksPrunned {
record.SignatureTasks[i] = *(task.(*SignatureTaskRecord))
}

return nil
}

func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) {
// Get nodes collection
nodesCollection := mongoDB.GetCollection(types.NodesCollection)

// Set filtering for this node-service pair data
node_filter := bson.D{{Key: "address", Value: node.Address}, {Key: "service", Value: node.Service}}
Expand All @@ -110,7 +44,7 @@ func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongod

// Retrieve this node entry
var found bool = true
cursor := collection.FindOne(ctxM, node_filter, opts)
cursor := nodesCollection.FindOne(ctxM, node_filter, opts)
err := cursor.Decode(record)
if err != nil {
if err == mongo.ErrNoDocuments {
Expand All @@ -127,89 +61,30 @@ func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongod
return found, nil
}

func (record *NodeRecord) AppendTask(framework string, task string, date time.Time, frameworkConfigMap map[string]types.FrameworkConfig, l *zerolog.Logger) TaskInterface {
func (record *NodeRecord) AppendTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) TaskInterface {

taskType, err := GetTaskType(framework, task, frameworkConfigMap, l)
if err != nil {
return nil
}

// Fill base task data
baseTaskData := BaseTaskRecord{
Framework: framework,
Task: task,
LastSeen: date,
}

if taskType == NumericalTaskTypeName {
// TODO: Get default values from framework-task
bufferLen := NumericalCircularBufferLength
timeArray := make([]time.Time, bufferLen)
for i := range timeArray {
timeArray[i] = date
}

newTask := NumericalTaskRecord{
TaskData: baseTaskData,
MeanScore: 0.0,
StdScore: 0.0,
ScoresSamples: make([]ScoresSample, bufferLen),
CircBuffer: types.CircularBuffer{
CircBufferLen: bufferLen,
NumSamples: 0,
Times: timeArray,
Indexes: types.CircularIndexes{
Start: 0,
End: 0,
},
},
}

record.NumericalTasks = append(record.NumericalTasks, newTask)

return &newTask

} else if taskType == SignatureTaskTypeName {

// TODO: Get default values from framework-task
bufferLen := SignatureCircularBufferLength
timeArray := make([]time.Time, bufferLen)
for i := range timeArray {
timeArray[i] = date
}

newTask := SignatureTaskRecord{
TaskData: baseTaskData,
LastSignature: "",
Signatures: make([]SignatureSample, bufferLen),
CircBuffer: types.CircularBuffer{
CircBufferLen: bufferLen,
NumSamples: 0,
Times: timeArray,
Indexes: types.CircularIndexes{
Start: 0,
End: 0,
},
},
}

record.SignatureTasks = append(record.SignatureTasks, newTask)

return &newTask
// Get the task, wich will create it if not found
taskRecord, found := GetTaskData(nodeID, taskType, framework, task, mongoDB, l)
if !found {
return nil
} else {
return taskRecord
}

return nil

}

func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMap map[string]types.FrameworkConfig, l *zerolog.Logger) error {
func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) error {
// Initialize empty record

// Set node data
record.Address = params.Node.Address
record.Service = params.Node.Service

// Never tested
record.ID = primitive.NewObjectID()
record.LastSeenHeight = 0
defaultDate := time.Date(2018, 1, 1, 00, 00, 00, 100, time.Local)
record.LastSeenTime = defaultDate
Expand All @@ -222,15 +97,20 @@ func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMa

for _, task := range test.Tasks {
// Add all tasks with the current date as maker for creation
_ = record.AppendTask(test.Framework, task, time.Now(), frameworkConfigMap, l)
_ = record.AppendTask(record.ID, test.Framework, task, time.Now(), frameworkConfigMap, mongoDB, l)
}
}

return nil
_, err := record.UpdateNode(mongoDB, l)

return err

}

func (record *NodeRecord) UpdateNode(collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) {
func (record *NodeRecord) UpdateNode(mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) {

// Get nodes collection
nodesCollection := mongoDB.GetCollection(types.NodesCollection)

opts := options.FindOneAndUpdate().SetUpsert(true)
node_filter := bson.D{{Key: "address", Value: record.Address}, {Key: "service", Value: record.Service}}
Expand All @@ -241,7 +121,7 @@ func (record *NodeRecord) UpdateNode(collection mongodb.CollectionAPI, l *zerolo
update := bson.D{{Key: "$set", Value: record}}
// Get collection and update
var found bool = true
err := collection.FindOneAndUpdate(ctxM, node_filter, update, opts).Decode(record)
err := nodesCollection.FindOneAndUpdate(ctxM, node_filter, update, opts).Decode(record)
if err != nil {
if err == mongo.ErrNoDocuments {
l.Warn().Str("address", record.Address).Str("service", record.Service).Msg("Node entry not found, a new one was created.")
Expand Down
Loading

0 comments on commit 31d5687

Please sign in to comment.