Skip to content

Commit

Permalink
fixing Sampler request expected input data
Browse files Browse the repository at this point in the history
  • Loading branch information
RawthiL committed May 16, 2024
1 parent f672c01 commit 598d2be
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 8 deletions.
4 changes: 3 additions & 1 deletion apps/go/manager/activities/analyze_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams

var result types.AnalyzeNodeResults
result.Success = false
result.IsNew = false

// Get logger
l := aCtx.App.Logger
Expand All @@ -41,6 +42,7 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams
// Create entry in MongoDB
l.Debug().Bool("found", found).Msg("Creating empty node entry.")
thisNodeData.Init(params, l)
result.IsNew = true

} else {
// If the node entry exist we must cycle and check for pending results
Expand Down Expand Up @@ -103,7 +105,7 @@ func (aCtx *Ctx) AnalyzeNode(ctx context.Context, params types.AnalyzeNodeParams
}
defer cursor.Close(ctxM)
var inQueue uint32 = 0
var blackList []int
blackList := make([]int, 0)
for cursor.Next(ctxM) {
var taskReq types.TaskRequestRecord
if err := cursor.Decode(&taskReq); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions apps/go/manager/activities/trigger_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func (aCtx *Ctx) TriggerSampler(ctx context.Context, params types.TriggerSampler
result.Success = false

samplerParams := types.SamplerWorkflowParams{
Evaluation: params.Trigger.Framework,
Task: params.Trigger.Task,
Framework: params.Trigger.Framework,
Task: params.Trigger.Task,
RequesterArgs: types.RequesterArgs{
Address: params.Trigger.Address,
Service: params.Trigger.Service,
Expand Down
1 change: 1 addition & 0 deletions apps/go/manager/types/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type AnalyzeNodeParams struct {

type AnalyzeNodeResults struct {
Success bool `json:"success"`
IsNew bool `json:"is_new"`
Triggers []TaskTrigger `json:"task_trigger"`
}

Expand Down
8 changes: 4 additions & 4 deletions apps/go/manager/types/node_task_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
// ------------------------------------------------------------------------------

type RequesterArgs struct {
Address string `bson:"address"`
Service string `bson:"service"`
Method string `bson:"method"`
Path string `bson:"path"`
Address string `json:"address"`
Service string `json:"service"`
Method string `json:"method"`
Path string `json:"path"`
}

// A pending request already processed by the Sampler
Expand Down
2 changes: 1 addition & 1 deletion apps/go/manager/types/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type NodeAnalysisChanResponse struct {
}

type SamplerWorkflowParams struct {
Evaluation string `json:"evaluation"`
Framework string `json:"framework"`
Task string `json:"tasks"`
RequesterArgs RequesterArgs `json:"requester_args"`
Blacklist []int `json:"blacklist"`
Expand Down
5 changes: 5 additions & 0 deletions apps/go/manager/workflows/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (wCtx *Ctx) NodeManager(ctx workflow.Context, params types.NodeManagerParam
response := <-nodeAnalysisResultsChan
// Append to triggers
allTriggers = append(allTriggers, response.Response.Triggers...)
// Keep count
// Update workflow result
if response.Response.IsNew {
result.NewNodes += 1
}
}

// -------------------------------------------------------------------------
Expand Down

0 comments on commit 598d2be

Please sign in to comment.