diff --git a/apps/go/manager/records/task.go b/apps/go/manager/records/task.go index 2491698..a7c2d4a 100644 --- a/apps/go/manager/records/task.go +++ b/apps/go/manager/records/task.go @@ -71,7 +71,7 @@ const TaskTTLDays uint32 = 32 type TaskInterface interface { ProcessData(l *zerolog.Logger) error - StepIndex(step int, marker string, l *zerolog.Logger) error + StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error CycleIndexes(l *zerolog.Logger) error InsertSample(timeSample time.Time, data interface{}, l *zerolog.Logger) (err error) GetNumSamples() uint32 @@ -106,7 +106,7 @@ func GetTaskData(nodeID primitive.ObjectID, taskType string, framework string, t } if !found { // Initialize and save - record.NewTask(nodeID, framework, task, time.Now().UTC(), l) + record.NewTask(nodeID, framework, task, types.EpochStart.UTC(), l) record.UpdateTask(nodeID, framework, task, mongoDB, l) } return &record, true @@ -120,7 +120,7 @@ func GetTaskData(nodeID primitive.ObjectID, taskType string, framework string, t } if !found { // Initialize and save - record.NewTask(nodeID, framework, task, time.Now().UTC(), l) + record.NewTask(nodeID, framework, task, types.EpochStart.UTC(), l) record.UpdateTask(nodeID, framework, task, mongoDB, l) } return &record, true @@ -545,8 +545,8 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) { } // Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer) -func (record *NumericalTaskRecord) StepIndex(step int, marker string, l *zerolog.Logger) error { - return record.CircBuffer.StepIndex(step, marker, l) +func (record *NumericalTaskRecord) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { + return record.CircBuffer.StepIndex(step, marker, positive_step, l) } // Updates the indexes making them point to the initial and final samples in a given time window. @@ -561,7 +561,7 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter } // Increment the end - err = record.StepIndex(1, "end", l) + err = record.StepIndex(1, "end", true, l) // Save sample record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID @@ -733,8 +733,8 @@ func (record *SignatureTaskRecord) UpdateLastHeight(height int64) (err error) { } // Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer) -func (record *SignatureTaskRecord) StepIndex(step int, marker string, l *zerolog.Logger) error { - return record.CircBuffer.StepIndex(step, marker, l) +func (record *SignatureTaskRecord) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { + return record.CircBuffer.StepIndex(step, marker, positive_step, l) } // Updates the indexes making them point to the initial and final samples in a given time window. @@ -758,7 +758,7 @@ func (record *SignatureTaskRecord) InsertSample(timeSample time.Time, data inter l.Debug().Str("signature", dataOk.Signature).Int("ID", dataOk.ID).Msg("Inserting sample.") // Increment the end - err = record.StepIndex(1, "end", l) + err = record.StepIndex(1, "end", true, l) // Save sample record.Signatures[record.CircBuffer.Indexes.End].Signature = dataOk.Signature record.Signatures[record.CircBuffer.Indexes.End].ID = dataOk.ID diff --git a/apps/go/manager/types/circular_buffer.go b/apps/go/manager/types/circular_buffer.go index 03f97e9..8298c35 100644 --- a/apps/go/manager/types/circular_buffer.go +++ b/apps/go/manager/types/circular_buffer.go @@ -2,11 +2,15 @@ package types import ( "errors" + "math" "time" "github.com/rs/zerolog" ) +// A date used to mark a position in the buffer that was never used +var EpochStart = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC) + // Keep track of circular buffer start and end indexes type CircularIndexes struct { Start uint32 `bson:"cir_start"` @@ -21,33 +25,33 @@ type CircularBuffer struct { } // Gets the sample index given a step direction (positive: 1 or negative: -1) and for a given marker (start or end of buffer) -func (buffer *CircularBuffer) StepIndex(step int, marker string, l *zerolog.Logger) error { +func (buffer *CircularBuffer) StepIndex(step uint32, marker string, positive_step bool, l *zerolog.Logger) error { l.Debug().Int("buffer.Indexes.End", int(buffer.Indexes.Start)).Int("buffer.Indexes.End", int(buffer.Indexes.End)).Msg("Circular indexes.") // Get values var currValue uint32 - var limitValue uint32 if marker == "start" { currValue = buffer.Indexes.Start - limitValue = buffer.Indexes.End } else if marker == "end" { currValue = buffer.Indexes.End - limitValue = buffer.Indexes.Start } else { return errors.New("buffer: invalid marker designation") } // perform the step - nextVal := int(currValue) + step - l.Debug().Int("nextVal", nextVal).Msg("Circular next value.") + var nextVal uint32 = 0 + if positive_step { + nextVal = currValue + step + } else { + nextVal = currValue - step + } // Check limits and assign value - currValue, err := buffer.BufferLimitCheck(nextVal, limitValue, l) + currValue, err := buffer.BufferLimitCheck(nextVal, l) if err != nil { return err } - l.Debug().Int("currValue", nextVal).Msg("Circular curr value.") // Update values if marker == "start" { @@ -56,11 +60,10 @@ func (buffer *CircularBuffer) StepIndex(step int, marker string, l *zerolog.Logg if (buffer.Indexes.Start == currValue) && (step > 0) { // This means that the end of the buffer advanced into the start of // the buffer, we must movethe buffer one position - buffer.StepIndex(1, "start", l) + buffer.StepIndex(1, "start", true, l) } buffer.Indexes.End = currValue } - l.Debug().Int("buffer.Indexes.End", int(buffer.Indexes.Start)).Int("buffer.Indexes.End", int(buffer.Indexes.End)).Msg("Circular indexes.") // Calculate number of valid samples validIdx, err := buffer.GetBufferValidIndexes(l) @@ -81,7 +84,7 @@ func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logg for oldestAge >= maxAge { // Increment the start - err := buffer.StepIndex(1, "start", l) + err := buffer.StepIndex(1, "start", true, l) if err != nil { return err } @@ -97,13 +100,13 @@ func (buffer *CircularBuffer) CycleIndexes(sampleTTLDays uint32, l *zerolog.Logg return nil } -func (buffer *CircularBuffer) BufferLimitCheck(nextVal int, limitValue uint32, l *zerolog.Logger) (uint32, error) { +func (buffer *CircularBuffer) BufferLimitCheck(nextVal uint32, l *zerolog.Logger) (uint32, error) { // Check for overflow - if nextVal >= int(buffer.CircBufferLen) { + if nextVal >= buffer.CircBufferLen { nextVal = 0 - } else if nextVal < 0 { + } else if nextVal == math.MaxInt32 { // Check for underflow - nextVal = int(buffer.CircBufferLen - 1) + nextVal = buffer.CircBufferLen - 1 } return uint32(nextVal), nil @@ -113,16 +116,19 @@ func (buffer *CircularBuffer) GetBufferValidIndexes(l *zerolog.Logger) (auxIdx [ idxNow := buffer.Indexes.Start for true { - // Add sample to data array - auxIdx = append(auxIdx, idxNow) + // If the sample never written, we should ignore it + if buffer.Times[idxNow] != EpochStart { + // Add sample to data array + auxIdx = append(auxIdx, idxNow) + } // run until we complete the circular buffer if idxNow == buffer.Indexes.End { break } // perform the step - nextVal := int(idxNow) + 1 + nextVal := idxNow + 1 // Check limits and assign value - idxNow, err = buffer.BufferLimitCheck(nextVal, buffer.Indexes.End, l) + idxNow, err = buffer.BufferLimitCheck(nextVal, l) if err != nil { return auxIdx, err } diff --git a/docker-compose/morse-poc/apps_configs/manager.json b/docker-compose/morse-poc/apps_configs/manager.json index d84f41c..02e4366 100644 --- a/docker-compose/morse-poc/apps_configs/manager.json +++ b/docker-compose/morse-poc/apps_configs/manager.json @@ -9,7 +9,7 @@ "max_backoff": 60, "req_per_sec": 10 }, - "log_level": "debug", + "log_level": "info", "temporal": { "host": "temporal", "port": 7233, diff --git a/docker-compose/morse-poc/apps_configs/requester.json b/docker-compose/morse-poc/apps_configs/requester.json index 254a805..cfdba18 100644 --- a/docker-compose/morse-poc/apps_configs/requester.json +++ b/docker-compose/morse-poc/apps_configs/requester.json @@ -13,7 +13,7 @@ "req_per_sec": 10, "session_tolerance": 1 }, - "log_level": "debug", + "log_level": "info", "temporal": { "host": "temporal", "port": 7233, diff --git a/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json b/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json index 7b9f4b2..4f7e674 100644 --- a/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json +++ b/docker-compose/morse-poc/dependencies_configs/sidecar/sidecar.json @@ -1,4 +1,4 @@ { - "log_level": "DEBUG", + "log_level": "INFO", "tokenizer_path": "/tokenizer" } \ No newline at end of file diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index 7c8dd69..dc82833 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -23,7 +23,7 @@ from packages.python.lmeh.utils.mongodb import MongoOperator from packages.python.lmeh.pocket_lm_eval.tasks import PocketNetworkTaskManager from packages.python.protocol.protocol import PocketNetworkTaskRequest, PocketNetworkMongoDBTask, \ - PocketNetworkMongoDBPrompt, NumericSample, PocketNetworkMongoDBResultNumerical + PocketNetworkMongoDBPrompt, NumericSample, PocketNetworkMongoDBResultNumerical, PocketNetworkMongoDBResultBase from motor.motor_asyncio import AsyncIOMotorClient from packages.python.common.mongodb import MongoClient from bson import ObjectId @@ -404,12 +404,15 @@ async def save_results( if len(task.instances) == 0: insert_mongo_results = [] eval_logger.debug("No instances/doc_id generated for task.", task_id=str(task_id)) - num_result = PocketNetworkMongoDBResultNumerical( - task_id=task_id, - num_samples=0, - status=1, + base_result = PocketNetworkMongoDBResultBase( + task_id=task_id, + status=1, + num_samples=0, result_height=task.result_height, result_time=datetime.today().isoformat(), + ) + num_result = PocketNetworkMongoDBResultNumerical( + result_data=base_result, scores=[]) insert_mongo_results.append(num_result.model_dump(by_alias=True)) await save_results( @@ -491,12 +494,15 @@ async def save_results( numericSample = NumericSample(score=example[selected_metrics], id=doc_id) scores.append(numericSample) + base_result = PocketNetworkMongoDBResultBase( + task_id=task_id, + status=0, + num_samples=len(result_num_samples), + result_height=task.result_height, + result_time=datetime.today().isoformat(), + ) num_result = PocketNetworkMongoDBResultNumerical( - task_id=task_id, - num_samples=len(result_num_samples), - status=0, - result_height=task.result_height, - result_time=datetime.today().isoformat(), + result_data=base_result, scores=scores) insert_mongo_results.append(num_result.model_dump(by_alias=True)) eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results)