Skip to content

Commit

Permalink
all elementes working withouth errors (on green path)
Browse files Browse the repository at this point in the history
  • Loading branch information
RawthiL committed Jun 17, 2024
1 parent a8613bf commit f070d04
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 41 deletions.
18 changes: 9 additions & 9 deletions apps/go/manager/records/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const TaskTTLDays uint32 = 32

type TaskInterface interface {
ProcessData(l *zerolog.Logger) error
StepIndex(step int, marker string, l *zerolog.Logger) error
StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error
CycleIndexes(l *zerolog.Logger) error
InsertSample(timeSample time.Time, data interface{}, l *zerolog.Logger) (err error)
GetNumSamples() uint32
Expand Down Expand Up @@ -106,7 +106,7 @@ func GetTaskData(nodeID primitive.ObjectID, taskType string, framework string, t
}
if !found {
// Initialize and save
record.NewTask(nodeID, framework, task, time.Now().UTC(), l)
record.NewTask(nodeID, framework, task, types.EpochStart.UTC(), l)
record.UpdateTask(nodeID, framework, task, mongoDB, l)
}
return &record, true
Expand All @@ -120,7 +120,7 @@ func GetTaskData(nodeID primitive.ObjectID, taskType string, framework string, t
}
if !found {
// Initialize and save
record.NewTask(nodeID, framework, task, time.Now().UTC(), l)
record.NewTask(nodeID, framework, task, types.EpochStart.UTC(), l)
record.UpdateTask(nodeID, framework, task, mongoDB, l)
}
return &record, true
Expand Down Expand Up @@ -545,8 +545,8 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) {
}

// Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer)
func (record *NumericalTaskRecord) StepIndex(step int, marker string, l *zerolog.Logger) error {
return record.CircBuffer.StepIndex(step, marker, l)
func (record *NumericalTaskRecord) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error {
return record.CircBuffer.StepIndex(step, marker, positive_step, l)
}

// Updates the indexes making them point to the initial and final samples in a given time window.
Expand All @@ -561,7 +561,7 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter
}

// Increment the end
err = record.StepIndex(1, "end", l)
err = record.StepIndex(1, "end", true, l)
// Save sample
record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score
record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID
Expand Down Expand Up @@ -733,8 +733,8 @@ func (record *SignatureTaskRecord) UpdateLastHeight(height int64) (err error) {
}

// Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer)
func (record *SignatureTaskRecord) StepIndex(step int, marker string, l *zerolog.Logger) error {
return record.CircBuffer.StepIndex(step, marker, l)
func (record *SignatureTaskRecord) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error {
return record.CircBuffer.StepIndex(step, marker, positive_step, l)
}

// Updates the indexes making them point to the initial and final samples in a given time window.
Expand All @@ -758,7 +758,7 @@ func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data inter
l.Debug().Str("signature", dataOk.Signature).Int("ID", dataOk.ID).Msg("Inserting sample.")

// Increment the end
err = record.StepIndex(1, "end", l)
err = record.StepIndex(1, "end", true, l)
// Save sample
record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature
record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID
Expand Down
44 changes: 25 additions & 19 deletions apps/go/manager/types/circular_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package types

import (
"errors"
"math"
"time"

"github.com/rs/zerolog"
)

// A date used to mark a position in the buffer that was never used
var EpochStart = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)

// Keep track of circular buffer start and end indexes
type CircularIndexes struct {
Start uint32 `bson:"cir_start"`
Expand All @@ -21,33 +25,33 @@ type CircularBuffer struct {
}

// Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer)
func (buffer *CircularBuffer) StepIndex(step int, marker string, l *zerolog.Logger) error {
func (buffer *CircularBuffer) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error {

l.Debug().Int("buffer.Indexes.End", int(buffer.Indexes.Start)).Int("buffer.Indexes.End", int(buffer.Indexes.End)).Msg("Circular indexes.")

// Get values
var currValue uint32
var limitValue uint32
if marker == "start" {
currValue = buffer.Indexes.Start
limitValue = buffer.Indexes.End
} else if marker == "end" {
currValue = buffer.Indexes.End
limitValue = buffer.Indexes.Start
} else {
return errors.New("buffer: invalid marker designation")
}

// perform the step
nextVal := int(currValue) + step
l.Debug().Int("nextVal", nextVal).Msg("Circular next value.")
var nextVal uint32 = 0
if positive_step {
nextVal = currValue + step
} else {
nextVal = currValue - step
}

// Check limits and assign value
currValue, err := buffer.BufferLimitCheck(nextVal, limitValue, l)
currValue, err := buffer.BufferLimitCheck(nextVal, l)
if err != nil {
return err
}
l.Debug().Int("currValue", nextVal).Msg("Circular curr value.")

// Update values
if marker == "start" {
Expand All @@ -56,11 +60,10 @@ func (buffer *CircularBuffer) StepIndex(step int, marker string, l *zerolog.Logg
if (buffer.Indexes.Start == currValue) && (step > 0) {
// This means that the end of the buffer advanced into the start of
// the buffer, we must movethe buffer one position
buffer.StepIndex(1, "start", l)
buffer.StepIndex(1, "start", true, l)
}
buffer.Indexes.End = currValue
}
l.Debug().Int("buffer.Indexes.End", int(buffer.Indexes.Start)).Int("buffer.Indexes.End", int(buffer.Indexes.End)).Msg("Circular indexes.")

// Calculate number of valid samples
validIdx, err := buffer.GetBufferValidIndexes(l)
Expand All @@ -81,7 +84,7 @@ func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logg

for oldestAge >= maxAge {
// Increment the start
err := buffer.StepIndex(1, "start", l)
err := buffer.StepIndex(1, "start", true, l)
if err != nil {
return err
}
Expand All @@ -97,13 +100,13 @@ func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logg
return nil
}

func (buffer *CircularBuffer) BufferLimitCheck(nextVal int, limitValue uint32, l *zerolog.Logger) (uint32, error) {
func (buffer *CircularBuffer) BufferLimitCheck(nextVal uint32, l *zerolog.Logger) (uint32, error) {
// Check for overflow
if nextVal >= int(buffer.CircBufferLen) {
if nextVal >= buffer.CircBufferLen {
nextVal = 0
} else if nextVal < 0 {
} else if nextVal == math.MaxInt32 {
// Check for underflow
nextVal = int(buffer.CircBufferLen - 1)
nextVal = buffer.CircBufferLen - 1
}

return uint32(nextVal), nil
Expand All @@ -113,16 +116,19 @@ func (buffer *CircularBuffer) GetBufferValidIndexes(l *zerolog.Logger) (auxIdx [

idxNow := buffer.Indexes.Start
for true {
// Add sample to data array
auxIdx = append(auxIdx, idxNow)
// If the sample never written, we should ignore it
if buffer.Times[idxNow] != EpochStart {
// Add sample to data array
auxIdx = append(auxIdx, idxNow)
}
// run until we complete the circular buffer
if idxNow == buffer.Indexes.End {
break
}
// perform the step
nextVal := int(idxNow) + 1
nextVal := idxNow + 1
// Check limits and assign value
idxNow, err = buffer.BufferLimitCheck(nextVal, buffer.Indexes.End, l)
idxNow, err = buffer.BufferLimitCheck(nextVal, l)
if err != nil {
return auxIdx, err
}
Expand Down
2 changes: 1 addition & 1 deletion docker-compose/morse-poc/apps_configs/manager.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"max_backoff": 60,
"req_per_sec": 10
},
"log_level": "debug",
"log_level": "info",
"temporal": {
"host": "temporal",
"port": 7233,
Expand Down
2 changes: 1 addition & 1 deletion docker-compose/morse-poc/apps_configs/requester.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"req_per_sec": 10,
"session_tolerance": 1
},
"log_level": "debug",
"log_level": "info",
"temporal": {
"host": "temporal",
"port": 7233,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"log_level": "DEBUG",
"log_level": "INFO",
"tokenizer_path": "/tokenizer"
}
26 changes: 16 additions & 10 deletions packages/python/lmeh/utils/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from packages.python.lmeh.utils.mongodb import MongoOperator
from packages.python.lmeh.pocket_lm_eval.tasks import PocketNetworkTaskManager
from packages.python.protocol.protocol import PocketNetworkTaskRequest, PocketNetworkMongoDBTask, \
PocketNetworkMongoDBPrompt, NumericSample, PocketNetworkMongoDBResultNumerical
PocketNetworkMongoDBPrompt, NumericSample, PocketNetworkMongoDBResultNumerical, PocketNetworkMongoDBResultBase
from motor.motor_asyncio import AsyncIOMotorClient
from packages.python.common.mongodb import MongoClient
from bson import ObjectId
Expand Down Expand Up @@ -404,12 +404,15 @@ async def save_results(
if len(task.instances) == 0:
insert_mongo_results = []
eval_logger.debug("No instances/doc_id generated for task.", task_id=str(task_id))
num_result = PocketNetworkMongoDBResultNumerical(
task_id=task_id,
num_samples=0,
status=1,
base_result = PocketNetworkMongoDBResultBase(
task_id=task_id,
status=1,
num_samples=0,
result_height=task.result_height,
result_time=datetime.today().isoformat(),
)
num_result = PocketNetworkMongoDBResultNumerical(
result_data=base_result,
scores=[])
insert_mongo_results.append(num_result.model_dump(by_alias=True))
await save_results(
Expand Down Expand Up @@ -491,12 +494,15 @@ async def save_results(
numericSample = NumericSample(score=example[selected_metrics], id=doc_id)
scores.append(numericSample)

base_result = PocketNetworkMongoDBResultBase(
task_id=task_id,
status=0,
num_samples=len(result_num_samples),
result_height=task.result_height,
result_time=datetime.today().isoformat(),
)
num_result = PocketNetworkMongoDBResultNumerical(
task_id=task_id,
num_samples=len(result_num_samples),
status=0,
result_height=task.result_height,
result_time=datetime.today().isoformat(),
result_data=base_result,
scores=scores)
insert_mongo_results.append(num_result.model_dump(by_alias=True))
eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results)
Expand Down

0 comments on commit f070d04

Please sign in to comment.