Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

64 manager add same trigger restrictions based on blocks 1159 #67

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 291 additions & 109 deletions apps/go/manager/activities/analyze_node.go

Large diffs are not rendered by default.

31 changes: 26 additions & 5 deletions apps/go/manager/activities/get_staked.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package activities
import (
"context"
"manager/types"
"strconv"
)

var GetStakedName = "get_staked"
Expand Down Expand Up @@ -40,11 +41,31 @@ func (aCtx *Ctx) GetStaked(ctx context.Context, params types.GetStakedParams) (*
l.Info().Int("nodes_staked", len(result.Nodes)).Msg("Successfully pulled staked node-services.")
}

// // cheap mock
// for i := 0; i < 5; i++ {
// thisNode := NodeData{Address: fmt.Sprint(i), Service: fmt.Sprint(i * 10)}
// result.Nodes = append(result.Nodes, thisNode)
// }
// Get block data
currHeight, err := aCtx.App.PocketRpc.GetHeight()
if err != nil {
l.Error().Str("service", params.Service).Msg("Could not retrieve latest block hieght.")
return nil, err
}
blockParams, err := aCtx.App.PocketRpc.GetAllParams(currHeight)
if err != nil {
l.Error().Str("service", params.Service).Msg("Could not retrieve block params.")
return nil, err
}
blocksPerSession, ok := blockParams.NodeParams.Get("pos/BlocksPerSession")
if !ok {
l.Error().Str("service", params.Service).Msg("Cannot get blocks per session parameter.")
return nil, err
}

result.Block.Height = currHeight
i64, err := strconv.ParseInt(blocksPerSession, 10, 64)
if err != nil {
l.Error().Str("service", params.Service).Msg("Could convert parameter to number.")
return nil, err
}

result.Block.BlocksPerSession = i64

return &result, nil
}
4 changes: 2 additions & 2 deletions apps/go/manager/activities/trigger_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (aCtx *Ctx) TriggerSampler(_ context.Context, params types.TriggerSamplerPa
Blacklist: params.Trigger.Blacklist,
Qty: params.Trigger.Qty,
}
evaluatorWorkflowOptions := client.StartWorkflowOptions{
samplerWorkflowOptions := client.StartWorkflowOptions{
TaskQueue: aCtx.App.Config.Temporal.Sampler.TaskQueue,
WorkflowExecutionErrorWhenAlreadyStarted: true,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
Expand All @@ -42,7 +42,7 @@ func (aCtx *Ctx) TriggerSampler(_ context.Context, params types.TriggerSamplerPa
// Do not wait for a result by not calling .Get() on the returned future
_, err := aCtx.App.TemporalClient.ExecuteWorkflow(
context.Background(),
evaluatorWorkflowOptions,
samplerWorkflowOptions,
aCtx.App.Config.Temporal.Sampler.WorkflowName,
samplerParams,
)
Expand Down
21 changes: 18 additions & 3 deletions apps/go/manager/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,24 @@
}
},
"frameworks": {
"lmeh" : {"task_types": {"any" : "numerical"}},
"helm" : {"task_types": {"any" : "numerical"}},
"signatures" : {"task_types": {"any" : "signature"}}
"lmeh" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"},
"schedule_limits": {"any" : "none:none"},
"trigger_minimum": {"any" : "0"}
},
"helm" : {
"task_types": {"any" : "numerical"},
"task_dependency": {"any" : "signatures:tokenizer:ok"},
"schedule_limits": {"any" : "none:none"},
"trigger_minimum": {"any" : "0"}
},
"signatures" : {
"task_types": {"any" : "signature"},
"task_dependency": {"any" : "none:none:none"},
"schedule_limits": {"any" : "1:session"},
"trigger_minimum": {"tokenizer" : "1"}
}
}

}
176 changes: 28 additions & 148 deletions apps/go/manager/records/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
Expand All @@ -21,84 +22,17 @@ import (
// DB entry of a given node-service pair
// The "Tasks" array will hold as many entries as tasks being tested
type NodeRecord struct {
Address string `bson:"address"`
Service string `bson:"service"`
LastSeenHeight uint32 `bson:"last_seen_height"`
LastSeenTime time.Time `bson:"last_seen_time"`
NumericalTasks []NumericalTaskRecord `bson:"numerical_tasks"`
SignatureTasks []SignatureTaskRecord `bson:"signature_tasks"`
ID primitive.ObjectID `bson:"_id,omitempty"`
Address string `bson:"address"`
Service string `bson:"service"`
LastSeenHeight int64 `bson:"last_seen_height"`
LastSeenTime time.Time `bson:"last_seen_time"`
}

// Creates and array of interfaces that contains all tasks
func (record *NodeRecord) CombineTasks() []TaskInterface {
combinedTasks := make([]TaskInterface, 0, len(record.NumericalTasks)+len(record.SignatureTasks))
func (record *NodeRecord) FindAndLoadNode(node types.NodeData, mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) {

for _, element := range record.NumericalTasks {
combinedTasks = append(combinedTasks, &element)
}

for _, element := range record.SignatureTasks {
combinedTasks = append(combinedTasks, &element)
}

return combinedTasks
}

func (record *NodeRecord) GetPrunedTasks(taskArray []TaskInterface, maxAge time.Duration, l *zerolog.Logger) []TaskInterface {
// Indices to remove
var indicesToRemove []int
// For each task
for i, task := range taskArray {
// Check the date of the index end
oldestAge := time.Since(task.GetLastSeen())
// Check
if oldestAge >= maxAge {
// Add to remove list
indicesToRemove = append(indicesToRemove, i)

l.Info().Str("address", record.Address).Str("service", record.Service).Str("framework", task.GetFramework()).Str("task", task.GetTask()).Msg("Removing task due to old age.")
}
}

// Remove Tasks
for i := len(indicesToRemove) - 1; i >= 0; i-- {
index := indicesToRemove[i]
taskArray = append(taskArray[:index], taskArray[index+1:]...)
}

return taskArray
}

// Go through all task and remove the ones that have no new samples since the limit
func (record *NodeRecord) PruneTasks(l *zerolog.Logger) error {

// Maximum age of a task
maxAge := time.Duration(TaskTTLDays) * 24 * time.Hour

// Remove Numerical Tasks
var numTaskInterfaces []TaskInterface
for _, task := range record.NumericalTasks {
numTaskInterfaces = append(numTaskInterfaces, &task)
}
tasksPrunned := record.GetPrunedTasks(numTaskInterfaces, maxAge, l)
for i, task := range tasksPrunned {
record.NumericalTasks[i] = *(task.(*NumericalTaskRecord))
}

// Remove Signature Tasks
var signTaskInterfaces []TaskInterface
for _, task := range record.SignatureTasks {
signTaskInterfaces = append(signTaskInterfaces, &task)
}
tasksPrunned = record.GetPrunedTasks(signTaskInterfaces, maxAge, l)
for i, task := range tasksPrunned {
record.SignatureTasks[i] = *(task.(*SignatureTaskRecord))
}

return nil
}

func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) {
// Get nodes collection
nodesCollection := mongoDB.GetCollection(types.NodesCollection)

// Set filtering for this node-service pair data
node_filter := bson.D{{Key: "address", Value: node.Address}, {Key: "service", Value: node.Service}}
Expand All @@ -110,7 +44,7 @@ func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongod

// Retrieve this node entry
var found bool = true
cursor := collection.FindOne(ctxM, node_filter, opts)
cursor := nodesCollection.FindOne(ctxM, node_filter, opts)
err := cursor.Decode(record)
if err != nil {
if err == mongo.ErrNoDocuments {
Expand All @@ -127,89 +61,30 @@ func (record *NodeRecord) FindAndLoadNode(node types.NodeData, collection mongod
return found, nil
}

func (record *NodeRecord) AppendTask(framework string, task string, date time.Time, frameworkConfigMap map[string]types.FrameworkConfig, l *zerolog.Logger) TaskInterface {
func (record *NodeRecord) AppendTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) TaskInterface {

taskType, err := GetTaskType(framework, task, frameworkConfigMap, l)
if err != nil {
return nil
}

// Fill base task data
baseTaskData := BaseTaskRecord{
Framework: framework,
Task: task,
LastSeen: date,
}

if taskType == NumericalTaskTypeName {
// TODO: Get default values from framework-task
bufferLen := NumericalCircularBufferLength
timeArray := make([]time.Time, bufferLen)
for i := range timeArray {
timeArray[i] = date
}

newTask := NumericalTaskRecord{
TaskData: baseTaskData,
MeanScore: 0.0,
StdScore: 0.0,
ScoresSamples: make([]ScoresSample, bufferLen),
CircBuffer: types.CircularBuffer{
CircBufferLen: bufferLen,
NumSamples: 0,
Times: timeArray,
Indexes: types.CircularIndexes{
Start: 0,
End: 0,
},
},
}

record.NumericalTasks = append(record.NumericalTasks, newTask)

return &newTask

} else if taskType == SignatureTaskTypeName {

// TODO: Get default values from framework-task
bufferLen := SignatureCircularBufferLength
timeArray := make([]time.Time, bufferLen)
for i := range timeArray {
timeArray[i] = date
}

newTask := SignatureTaskRecord{
TaskData: baseTaskData,
LastSignature: "",
Signatures: make([]SignatureSample, bufferLen),
CircBuffer: types.CircularBuffer{
CircBufferLen: bufferLen,
NumSamples: 0,
Times: timeArray,
Indexes: types.CircularIndexes{
Start: 0,
End: 0,
},
},
}

record.SignatureTasks = append(record.SignatureTasks, newTask)

return &newTask
// Get the task, wich will create it if not found
taskRecord, found := GetTaskData(nodeID, taskType, framework, task, mongoDB, l)
if !found {
return nil
} else {
return taskRecord
}

return nil

}

func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMap map[string]types.FrameworkConfig, l *zerolog.Logger) error {
func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMap map[string]types.FrameworkConfig, mongoDB mongodb.MongoDb, l *zerolog.Logger) error {
// Initialize empty record

// Set node data
record.Address = params.Node.Address
record.Service = params.Node.Service

// Never tested
record.ID = primitive.NewObjectID()
record.LastSeenHeight = 0
defaultDate := time.Date(2018, 1, 1, 00, 00, 00, 100, time.Local)
record.LastSeenTime = defaultDate
Expand All @@ -222,15 +97,20 @@ func (record *NodeRecord) Init(params types.AnalyzeNodeParams, frameworkConfigMa

for _, task := range test.Tasks {
// Add all tasks with the current date as maker for creation
_ = record.AppendTask(test.Framework, task, time.Now(), frameworkConfigMap, l)
_ = record.AppendTask(record.ID, test.Framework, task, time.Now(), frameworkConfigMap, mongoDB, l)
}
}

return nil
_, err := record.UpdateNode(mongoDB, l)

return err

}

func (record *NodeRecord) UpdateNode(collection mongodb.CollectionAPI, l *zerolog.Logger) (bool, error) {
func (record *NodeRecord) UpdateNode(mongoDB mongodb.MongoDb, l *zerolog.Logger) (bool, error) {

// Get nodes collection
nodesCollection := mongoDB.GetCollection(types.NodesCollection)

opts := options.FindOneAndUpdate().SetUpsert(true)
node_filter := bson.D{{Key: "address", Value: record.Address}, {Key: "service", Value: record.Service}}
Expand All @@ -241,7 +121,7 @@ func (record *NodeRecord) UpdateNode(collection mongodb.CollectionAPI, l *zerolo
update := bson.D{{Key: "$set", Value: record}}
// Get collection and update
var found bool = true
err := collection.FindOneAndUpdate(ctxM, node_filter, update, opts).Decode(record)
err := nodesCollection.FindOneAndUpdate(ctxM, node_filter, update, opts).Decode(record)
if err != nil {
if err == mongo.ErrNoDocuments {
l.Warn().Str("address", record.Address).Str("service", record.Service).Msg("Node entry not found, a new one was created.")
Expand Down
Loading
Loading