From 09cc158fb366f9bd8328a4d24b02542239e18dec Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Wed, 11 Sep 2024 17:54:21 -0300 Subject: [PATCH 1/4] numerical samples now report completition times to results - NOT FUNCTIONAL --- apps/python/sampler/README.md | 6 +++--- .../pocket_lm_eval/models/pocket_network.py | 5 +++++ packages/python/lmeh/utils/generator.py | 18 +++++++++++++----- packages/python/lmeh/utils/mongodb.py | 3 +++ packages/python/protocol/protocol.py | 2 ++ 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/apps/python/sampler/README.md b/apps/python/sampler/README.md index f2003df..507325e 100644 --- a/apps/python/sampler/README.md +++ b/apps/python/sampler/README.md @@ -28,9 +28,9 @@ Files that follow the structure of `lm-eval-harness`. The intention, for instanc * `PocketNetworkTaskManager`: A class based on `TaskManager`, that is used to inject `pocket_args` into the `task.config.metadata`. **api** -* `PocketNetworkConfigurableTask`: A class based on `ConfigurableTask`, that retrieve samples from the sql database, based on `blacklist` id's & `uri` previously defined in `pocket_args`. In `PocketNetworkConfigurableTask.download` validations reagrding `training_split`, `validation_split`, `test_split` and `fewshot_split` are followed as pointed in the `lm-eval-harness- documentation. +* `PocketNetworkConfigurableTask`: A class based on `ConfigurableTask`, that retrieve samples from the sql database, based on `blacklist` id's & `uri` previously defined in `pocket_args`. In `PocketNetworkConfigurableTask.download` validations regarding `training_split`, `validation_split`, `test_split` and `fewshot_split` are followed as pointed in the `lm-eval-harness- documentation. - * `def build_all_requests` was modified in order to inyecet the postgres document id into the `Instance.doc_id`. + * `def build_all_requests` was modified in order to inject the Postgres document id into the `Instance.doc_id`. **models** * `PocketNetworkLM`: A class that mimic partially `OpenaiChatCompletionsLM` from `lm-eval-harness`. Instead on generate request and take respobnses, both `loglikelihood` and `generate_until` methods instantiate `CompletionRequest`. The last is a class used as a proxy to generate the `data` field of a RPC request that is saved in Mongo. @@ -39,7 +39,7 @@ Files that follow the structure of `lm-eval-harness`. The intention, for instanc * `get_configurable_task`: A function to return only `ConfigurableTask` based on the `task_manager`. * If `task_manager` is `TaskManager`, then all samples from all splits are part of the dataset. * If `task_manager` is `PocketNetworkTaskManager`, random samples are generated based on the configuration split and the blacklist provided in `pocket_args`. -* `genererate_requests`: A functions that hierarchically save in MongoDB the strucutre of `Task`->`Instances`->`Prompts`. +* `genererate_requests`: A functions that hierarchically save in MongoDB the structure of `Task`->`Instances`->`Prompts`. ### Accessing the DB with PG Admin diff --git a/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py b/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py index b18b40a..de2aba8 100644 --- a/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py +++ b/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py @@ -663,6 +663,11 @@ def loglikelihood( ) return self._loglikelihood_tokens(new_reqs, disable_tqdm=disable_tqdm) + + def response_times( + self, requests, disable_tqdm: bool = True + ) -> List[int]: + return [req.resp.response_time for req in requests] def _encode_pair(self, context_enc, continuation_enc): return context_enc, continuation_enc diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index 1fd63bf..d63bb4b 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -18,6 +18,8 @@ simple_parse_args_string, ) +import numpy as np + if TYPE_CHECKING: from lm_eval.api.model import LM from lm_eval.tasks import Task @@ -491,7 +493,7 @@ async def save_results( result_time=datetime.today().isoformat(), ) num_result = PocketNetworkMongoDBResultNumerical( - result_data=base_result, scores=[] + result_data=base_result, scores=[], times=[] ) insert_mongo_results.append(num_result.model_dump(by_alias=True)) await save_results( @@ -510,13 +512,16 @@ async def save_results( cloned_reqs = [] for req in reqs: cloned_reqs.extend([req] * req.repeats) - + req.times = [] # run requests through model resps = getattr(lm, reqtype)(cloned_reqs) + # Get times POKT Network + times = getattr(lm, "response_times")(cloned_reqs) # put responses from model into a list of length K for each request. - for x, req in zip(resps, cloned_reqs): + for x, t, req in zip(resps, times, cloned_reqs): req.resps.append(x) + req.times.append(t) RANK = lm.rank WORLD_SIZE = lm.world_size @@ -539,6 +544,7 @@ async def save_results( instances.sort(key=lambda x: x.idx) # iterate over different filters used scores = [] + times = [] result_num_samples = set() for filter_key in task.instances[0].filtered_resps.keys(): if filter_key not in selected_filters: @@ -555,6 +561,7 @@ async def save_results( metrics = task.process_results( doc, [req.filtered_resps[filter_key] for req in requests] ) + response_times = [np.mean(req.times).astype(float) for req in requests] if log_samples: target = task.doc_to_target(doc) example = { @@ -579,11 +586,12 @@ async def save_results( } example.update(metrics) task_output.logged_samples.append(example) - for metric, value in metrics.items(): + for (metric, value), ms in zip(metrics.items(), response_times): task_output.sample_metrics[(metric, filter_key)].append(value) if metric in selected_metrics: numericSample = NumericSample(score=example[metric], id=doc_id) scores.append(numericSample) + times.append(ms) base_result = PocketNetworkMongoDBResultBase( task_id=task_id, @@ -593,7 +601,7 @@ async def save_results( result_time=datetime.today().isoformat(), ) num_result = PocketNetworkMongoDBResultNumerical( - result_data=base_result, scores=scores + result_data=base_result, scores=scores, times=times ) insert_mongo_results.append(num_result.model_dump(by_alias=True)) eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results) diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index b9e9e02..5cd9635 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -330,6 +330,7 @@ async def reconstruct_instances( try: # handle the exception to bring a light on production debugging if needed. r = json.loads(doc["response"]["response"]) + ms = int(doc["response"]["ms"]) except Exception as e: remove_doc_ids.add(i["doc_id"]) eval_logger.error( @@ -360,6 +361,7 @@ async def reconstruct_instances( instance.prompt.data = CompletionRequest(**request_data) try: + r['response_time'] = ms instance.resp = CompletionResponse(**r) except Exception as e: remove_doc_ids.add(i["doc_id"]) @@ -397,6 +399,7 @@ async def mark_task_to_drop(self, task_id: ObjectId): result_time=datetime.today().isoformat(), ), scores=[], + times=[] ).model_dump(by_alias=True) async with self.client.start_transaction() as session: diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 6880678..9677fbc 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -242,6 +242,7 @@ class CompletionResponse(OpenAIBaseModel): model: str choices: List[CompletionResponseChoice] usage: UsageInfo + response_time: int # Total time to complete request (POKT Network) ########### @@ -284,6 +285,7 @@ class PocketNetworkMongoDBResultNumerical(BaseModel): id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") result_data: PocketNetworkMongoDBResultBase scores: List[NumericSample] + times: List[float] class Config: arbitrary_types_allowed = True From 8c7098b9a1f2ed6ae7dcbbaa167f336227d0c1d4 Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Thu, 12 Sep 2024 11:38:39 -0300 Subject: [PATCH 2/4] updated GO process --- apps/go/manager/records/result.go | 5 ++-- apps/go/manager/records/task.go | 42 ++++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/apps/go/manager/records/result.go b/apps/go/manager/records/result.go index 4e1b1d8..8ed40ce 100644 --- a/apps/go/manager/records/result.go +++ b/apps/go/manager/records/result.go @@ -55,8 +55,9 @@ type ResultInterface interface { // Record written by the evaluator. // The NumericalResultRecord field indicates how many samples were actually calculated type NumericalResultRecord struct { - ResultData BaseResultRecord `bson:"result_data"` - ScoresSamples []ScoresSample `bson:"scores"` + ResultData BaseResultRecord `bson:"result_data"` + ScoresSamples []ScoresSample `bson:"scores"` + ComputeTimesSamples []float32 `bson:"times"` } func (record *NumericalResultRecord) GetResultTime() time.Time { diff --git a/apps/go/manager/records/task.go b/apps/go/manager/records/task.go index 2ff2c77..b002d1e 100644 --- a/apps/go/manager/records/task.go +++ b/apps/go/manager/records/task.go @@ -359,8 +359,13 @@ type NumericalTaskRecord struct { MeanScore float32 `bson:"mean_scores"` MedianScore float32 `bson:"median_scores"` StdScore float32 `bson:"std_scores"` + // Times + MeanProcessTime float32 `bson:"mean_times"` + MedianProcessTime float32 `bson:"median_times"` + StdProcessTime float32 `bson:"std_times"` // buffer - ScoresSamples []ScoresSample `bson:"scores"` + ScoresSamples []ScoresSample `bson:"scores"` + ProcessTimeSamples []float32 `bson:"times"` // circular buffer control CircBuffer types.CircularBuffer `bson:"circ_buffer_control"` } @@ -520,32 +525,51 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) { } // Slice the buffer and cast - var auxData []float64 + var auxDataScores []float64 + var auxDataTimes []float64 for _, sampleId := range validIdx { // Add sample to data array - auxData = append(auxData, float64(record.ScoresSamples[sampleId].Score)) + auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score)) + auxDataTimes = append(auxDataTimes, float64(record.ProcessTimeSamples[sampleId])) } - length := len(auxData) + length := len(auxDataScores) if length == 0 { record.MeanScore = 0 record.StdScore = 0 record.MedianScore = 0 + record.MeanProcessTime = 0 + record.StdProcessTime = 0 + record.MedianProcessTime = 0 + } else if length == 1 { record.MeanScore = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].Score) record.StdScore = 0 record.MedianScore = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].Score) + record.MeanProcessTime = float32(record.ProcessTimeSamples[record.CircBuffer.Indexes.Start]) + record.StdProcessTime = 0 + record.MedianProcessTime = float32(record.ProcessTimeSamples[record.CircBuffer.Indexes.Start]) } else { // Calculate the mean - record.MeanScore = float32(stat.Mean(auxData, nil)) + record.MeanScore = float32(stat.Mean(auxDataScores, nil)) // Calculate the standard deviation - record.StdScore = float32(stat.StdDev(auxData, nil)) + record.StdScore = float32(stat.StdDev(auxDataScores, nil)) // Calculate the median - sort.Float64s(auxData) + sort.Float64s(auxDataScores) + if length%2 == 0 { + record.MedianScore = float32((auxDataScores[length/2-1] + auxDataScores[length/2]) / 2) + } else { + record.MedianScore = float32(auxDataScores[length/2]) + } + + // Same for times + record.MeanProcessTime = float32(stat.Mean(auxDataTimes, nil)) + record.StdProcessTime = float32(stat.StdDev(auxDataTimes, nil)) + sort.Float64s(auxDataTimes) if length%2 == 0 { - record.MedianScore = float32((auxData[length/2-1] + auxData[length/2]) / 2) + record.MedianProcessTime = float32((auxDataTimes[length/2-1] + auxDataTimes[length/2]) / 2) } else { - record.MedianScore = float32(auxData[length/2]) + record.MedianProcessTime = float32(auxDataTimes[length/2]) } } return err From 6ae3c8b41511a2f4e0cb3115f0c122e920d8783f Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Thu, 12 Sep 2024 16:55:29 -0300 Subject: [PATCH 3/4] time tracking working and presented in web --- .../manager/activities/process_node_result.go | 2 +- apps/go/manager/records/task.go | 20 ++++++--- apps/go/manager/types/node_task_request.go | 2 +- apps/python/api/app/basemodels.py | 4 ++ apps/python/api/app/leaderboard.py | 45 ++++++++++++++++--- packages/python/lmeh/utils/generator.py | 7 ++- packages/python/lmeh/utils/mongodb.py | 41 ++++++++++++----- packages/python/protocol/protocol.py | 2 +- 8 files changed, 93 insertions(+), 30 deletions(-) diff --git a/apps/go/manager/activities/process_node_result.go b/apps/go/manager/activities/process_node_result.go index 6f50682..156f0c4 100644 --- a/apps/go/manager/activities/process_node_result.go +++ b/apps/go/manager/activities/process_node_result.go @@ -192,7 +192,7 @@ func retrieveTaskData(taskID primitive.ObjectID, cursor := tasksCollection.FindOne(ctxM, task_request_filter, opts) var taskReq types.TaskRequestRecord if err := cursor.Decode(&taskReq); err != nil { - l.Error().Msg("Could not decode task request data from MongoDB.") + l.Error().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.") return taskReq, err } diff --git a/apps/go/manager/records/task.go b/apps/go/manager/records/task.go index b002d1e..bf2bdc2 100644 --- a/apps/go/manager/records/task.go +++ b/apps/go/manager/records/task.go @@ -364,15 +364,15 @@ type NumericalTaskRecord struct { MedianProcessTime float32 `bson:"median_times"` StdProcessTime float32 `bson:"std_times"` // buffer - ScoresSamples []ScoresSample `bson:"scores"` - ProcessTimeSamples []float32 `bson:"times"` + ScoresSamples []ScoresSample `bson:"scores"` // circular buffer control CircBuffer types.CircularBuffer `bson:"circ_buffer_control"` } type ScoresSample struct { - Score float64 `bson:"score"` - ID int `bson:"id"` + Score float64 `bson:"score"` + ID int `bson:"id"` + RunTime float32 `bson:"run_time"` } func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) { @@ -389,8 +389,13 @@ func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework record.TaskData.LastSeen = date record.MeanScore = 0.0 + record.MedianScore = 0.0 record.StdScore = 0.0 + record.MeanProcessTime = 0.0 + record.MedianProcessTime = 0.0 + record.StdProcessTime = 0.0 record.ScoresSamples = make([]ScoresSample, bufferLen) + record.CircBuffer = types.CircularBuffer{ CircBufferLen: bufferLen, NumSamples: 0, @@ -530,7 +535,7 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) { for _, sampleId := range validIdx { // Add sample to data array auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score)) - auxDataTimes = append(auxDataTimes, float64(record.ProcessTimeSamples[sampleId])) + auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime)) } length := len(auxDataScores) @@ -546,9 +551,9 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) { record.MeanScore = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].Score) record.StdScore = 0 record.MedianScore = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].Score) - record.MeanProcessTime = float32(record.ProcessTimeSamples[record.CircBuffer.Indexes.Start]) + record.MeanProcessTime = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].RunTime) record.StdProcessTime = 0 - record.MedianProcessTime = float32(record.ProcessTimeSamples[record.CircBuffer.Indexes.Start]) + record.MedianProcessTime = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].RunTime) } else { // Calculate the mean record.MeanScore = float32(stat.Mean(auxDataScores, nil)) @@ -596,6 +601,7 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter // Save sample record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID + record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample return nil diff --git a/apps/go/manager/types/node_task_request.go b/apps/go/manager/types/node_task_request.go index 840eea5..a2df52e 100644 --- a/apps/go/manager/types/node_task_request.go +++ b/apps/go/manager/types/node_task_request.go @@ -24,7 +24,7 @@ type TaskRequestRecord struct { Blacklist []int `bson:"blacklist"` Qty int `bson:"qty"` TotalInstances int `bson:"total_instances"` - RequestType string `bson:"string"` + RequestType string `bson:"request_type"` Done bool `bson:"done"` } diff --git a/apps/python/api/app/basemodels.py b/apps/python/api/app/basemodels.py index f522950..a2b2793 100644 --- a/apps/python/api/app/basemodels.py +++ b/apps/python/api/app/basemodels.py @@ -31,8 +31,12 @@ "last_seen": "$task_data.last_seen", "last_height": "$task_data.last_height", "mean": "$mean_scores", + "median": "$median_scores", "std": "$std_scores", "num": "$circ_buffer_control.num_samples", + "mean_times": "$mean_times", + "median_times": "$median_times", + "std_times": "$std_times", } }, ] diff --git a/apps/python/api/app/leaderboard.py b/apps/python/api/app/leaderboard.py index 3588dfc..6d29960 100644 --- a/apps/python/api/app/leaderboard.py +++ b/apps/python/api/app/leaderboard.py @@ -118,18 +118,14 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: leaderboard_entry = dict() leaderboard_entry["status"] = "OK" - # Add QoS - leaderboard_entry["qos"] = { - "error_rate": (np.random.random(1)[0] * 0.1), - "response_time": (np.random.random(1)[0] * 1000), - } - # Add Metrics leaderboard_entry["metrics"] = dict() running_mean_avg = 0 weight_avg = 0 std_err_avg = 0 incomplete = False + running_mean_time_avg = 0 + std_err_time_avg = 0 for metric in LEADERBOARD_METRICS.keys(): metric_name = LEADERBOARD_METRICS[metric] @@ -137,6 +133,8 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: running_mean_mmlu = 0 weight_mmlu = 0 std_err_mmlu = 0 + running_mean_time_mmlu = 0 + std_err_time_mmlu = 0 # This requiere more steps yay! all_ok = True partial = False @@ -161,15 +159,26 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: if data_row["num"].values[0] <= 50: # This is a partial metric partial = True + metric_time_mean = data_row["mean_times"].values[0] * data_row["num"].values[0] + metric_time_std_err = data_row["std_times"].values[0] / np.sqrt( + data_row["num"].values[0] + ) else: metric_mean = 0 metric_std_err = 0 + metric_time_mean = 0 + metric_time_std_err = 0 this_w = data_row["num"].values[0] running_mean_mmlu += metric_mean + running_mean_time_mmlu += metric_time_mean weight_mmlu += this_w if this_w > 0: std_err_mmlu += (metric_std_err / this_w) ** 2 + std_err_time_mmlu += (metric_time_std_err / this_w) ** 2 + + + if all_ok: if weight_mmlu == 0: @@ -182,11 +191,15 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: metric_mean = running_mean_mmlu metric_std_err = std_err_mmlu metric_weight = weight_mmlu / len(LEADERBOARD_MMLU_METRICS) + metric_time_mean = running_mean_time_mmlu + metric_time_std_err = std_err_time_mmlu else: # No data metric_mean = np.nan metric_std_err = np.nan metric_weight = np.nan + metric_time_mean = np.nan + metric_time_std_err = np.nan else: data_row = scores_df.loc[ @@ -200,6 +213,8 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: metric_mean = np.nan metric_std_err = np.nan metric_weight = np.nan + metric_time_mean = np.nan + metric_time_std_err = np.nan elif data_row["num"].values[0] > 0: metric_mean = data_row["mean"].values[0] @@ -209,10 +224,17 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: metric_weight = data_row["num"].values[0] if data_row["num"].values[0] <= 50: partial = True + + metric_time_mean = data_row["mean_times"].values[0] + metric_time_std_err = data_row["std_times"].values[0] / np.sqrt( + data_row["num"].values[0] + ) else: metric_mean = 0 metric_std_err = 0 metric_weight = 0 + metric_time_mean = 0 + metric_time_std_err = 0 if np.isnan(metric_mean) or metric_weight == 0: leaderboard_entry["metrics"][metric_name] = { @@ -230,11 +252,16 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: running_mean_avg += metric_mean * metric_weight weight_avg += metric_weight std_err_avg += metric_std_err**2 + # track times + running_mean_time_avg += metric_time_mean * metric_weight + std_err_time_avg += metric_time_std_err**2 if weight_avg == 0: running_mean_avg = 0 + running_mean_time_avg = 0 else: running_mean_avg = running_mean_avg / weight_avg + running_mean_time_avg = running_mean_time_avg / weight_avg if std_err_avg != 0: std_err_avg = np.sqrt(std_err_avg) @@ -244,6 +271,12 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: "status": "OK" if not incomplete else "INCOMPLETE", } + # Add QoS + leaderboard_entry["qos"] = { + "error_rate": 0.0, + "response_time": running_mean_time_avg, + } + # Add Metadata leaderboard_entry["metadata"] = { "service": str(node_df["service"].values[0]), diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index d63bb4b..05daaef 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -493,7 +493,7 @@ async def save_results( result_time=datetime.today().isoformat(), ) num_result = PocketNetworkMongoDBResultNumerical( - result_data=base_result, scores=[], times=[] + result_data=base_result, scores=[] ) insert_mongo_results.append(num_result.model_dump(by_alias=True)) await save_results( @@ -589,9 +589,8 @@ async def save_results( for (metric, value), ms in zip(metrics.items(), response_times): task_output.sample_metrics[(metric, filter_key)].append(value) if metric in selected_metrics: - numericSample = NumericSample(score=example[metric], id=doc_id) + numericSample = NumericSample(score=example[metric], run_time=ms, id=doc_id) scores.append(numericSample) - times.append(ms) base_result = PocketNetworkMongoDBResultBase( task_id=task_id, @@ -601,7 +600,7 @@ async def save_results( result_time=datetime.today().isoformat(), ) num_result = PocketNetworkMongoDBResultNumerical( - result_data=base_result, scores=scores, times=times + result_data=base_result, scores=scores ) insert_mongo_results.append(num_result.model_dump(by_alias=True)) eval_logger.debug("Mongo Result:", mongo_result=insert_mongo_results) diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index 5cd9635..55338b1 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -87,8 +87,8 @@ def instance_to_dict(instance: Instance, task_id: ObjectId) -> dict: instance_mongo["_id"] = ObjectId() instance_mongo["done"] = False return instance_mongo - - async def get_tokenizer_hash(self, address: str, service: str) -> str: + + async def get_node_id(self, address: str, service: str) -> str: node = await self.client.db[self.nodes_collection].find_one( {"address": address, "service": service} ) @@ -109,28 +109,32 @@ async def get_tokenizer_hash(self, address: str, service: str) -> str: raise ApplicationError( f"Node address {address}, has no _id, cannot load tokenizer hash." ) + + return node["_id"] + + async def get_signature_hash(self, address: str, node_id: str, signature_name: str) -> str: # Get the corresponding signature buffer buffer = await self.client.db[self.buffers_signatures_collection].find_one( { - "task_data.node_id": node["_id"], + "task_data.node_id": node_id, "task_data.framework": "signatures", - "task_data.task": "tokenizer", + "task_data.task": signature_name, } ) if buffer is None: eval_logger.error( - "Buffer for tokenizer signature not found.", adress=address + f"Buffer for {signature_name} signature not found.", adress=address ) raise ApplicationError( - f"Node address {address} does not have a tokenizer signature buffer associated." + f"Node address {address} does not have a {signature_name} signature buffer associated." ) - eval_logger.debug("Tokennizer signature buffer found.", buffer=buffer) + eval_logger.debug(f"{signature_name} signature buffer found.", buffer=buffer) - tokenizer_hash = buffer.get("last_signature", None) - if tokenizer_hash is None: + this_hash = buffer.get("last_signature", None) + if this_hash is None: eval_logger.error( "Buffer has no last signature field, entry is malformed cannot procede.", adress=address, @@ -138,8 +142,25 @@ async def get_tokenizer_hash(self, address: str, service: str) -> str: raise ApplicationError( f"Node address {address} buffer has no last signature field, entry is malformed cannot procede." ) + + return this_hash + + async def get_tokenizer_hash(self, address: str, service: str) -> str: + + # Get node ID + node_id = await self.get_node_id(address, service) + # Get tokenizer signature hash + tokenizer_hash = await self.get_signature_hash(address, node_id, "tokenizer") return tokenizer_hash + + async def get_config_hash(self, address: str, service: str) -> str: + # Get node ID + node_id = await self.get_node_id(address, service) + # Get config signature hash + config_hash = await self.get_signature_hash(address, node_id, "config") + + return config_hash async def get_tokenizer_entry(self, tokenizer_hash: str): return await self.client.db[self.tokenizers_collection].find_one( @@ -186,7 +207,7 @@ async def get_tokenizer_objects(self, address: str, service: str) -> dict: async def get_config_objects(self, address: str, service: str) -> dict: # TODO # add get_config_hash method to - config_hash = await self.get_tokenizer_hash(address, service) + config_hash = await self.get_config_hash(address, service) if config_hash == "": eval_logger.error( diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 9677fbc..6d2fa43 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -279,13 +279,13 @@ class Config: class NumericSample(BaseModel): score: float id: int + run_time: float class PocketNetworkMongoDBResultNumerical(BaseModel): id: PyObjectId = Field(default_factory=PyObjectId, alias="_id") result_data: PocketNetworkMongoDBResultBase scores: List[NumericSample] - times: List[float] class Config: arbitrary_types_allowed = True From 34d87f161d4a8782c2bab909af8e65903edd5280 Mon Sep 17 00:00:00 2001 From: Ramiro Rodriguez Colmeiro Date: Thu, 12 Sep 2024 16:56:32 -0300 Subject: [PATCH 4/4] linted --- apps/python/api/app/leaderboard.py | 12 ++++++------ .../pocket_lm_eval/models/pocket_network.py | 6 ++---- packages/python/lmeh/utils/generator.py | 4 +++- packages/python/lmeh/utils/mongodb.py | 18 +++++++++--------- packages/python/protocol/protocol.py | 2 +- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/apps/python/api/app/leaderboard.py b/apps/python/api/app/leaderboard.py index 6d29960..871b2b8 100644 --- a/apps/python/api/app/leaderboard.py +++ b/apps/python/api/app/leaderboard.py @@ -159,10 +159,13 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: if data_row["num"].values[0] <= 50: # This is a partial metric partial = True - metric_time_mean = data_row["mean_times"].values[0] * data_row["num"].values[0] - metric_time_std_err = data_row["std_times"].values[0] / np.sqrt( - data_row["num"].values[0] + metric_time_mean = ( + data_row["mean_times"].values[0] + * data_row["num"].values[0] ) + metric_time_std_err = data_row["std_times"].values[ + 0 + ] / np.sqrt(data_row["num"].values[0]) else: metric_mean = 0 metric_std_err = 0 @@ -177,9 +180,6 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]: std_err_mmlu += (metric_std_err / this_w) ** 2 std_err_time_mmlu += (metric_time_std_err / this_w) ** 2 - - - if all_ok: if weight_mmlu == 0: running_mean_mmlu = 0 diff --git a/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py b/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py index de2aba8..81bcc2d 100644 --- a/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py +++ b/packages/python/lmeh/pocket_lm_eval/models/pocket_network.py @@ -663,10 +663,8 @@ def loglikelihood( ) return self._loglikelihood_tokens(new_reqs, disable_tqdm=disable_tqdm) - - def response_times( - self, requests, disable_tqdm: bool = True - ) -> List[int]: + + def response_times(self, requests, disable_tqdm: bool = True) -> List[int]: return [req.resp.response_time for req in requests] def _encode_pair(self, context_enc, continuation_enc): diff --git a/packages/python/lmeh/utils/generator.py b/packages/python/lmeh/utils/generator.py index 05daaef..fd9d728 100644 --- a/packages/python/lmeh/utils/generator.py +++ b/packages/python/lmeh/utils/generator.py @@ -589,7 +589,9 @@ async def save_results( for (metric, value), ms in zip(metrics.items(), response_times): task_output.sample_metrics[(metric, filter_key)].append(value) if metric in selected_metrics: - numericSample = NumericSample(score=example[metric], run_time=ms, id=doc_id) + numericSample = NumericSample( + score=example[metric], run_time=ms, id=doc_id + ) scores.append(numericSample) base_result = PocketNetworkMongoDBResultBase( diff --git a/packages/python/lmeh/utils/mongodb.py b/packages/python/lmeh/utils/mongodb.py index 55338b1..2f853b3 100644 --- a/packages/python/lmeh/utils/mongodb.py +++ b/packages/python/lmeh/utils/mongodb.py @@ -87,7 +87,7 @@ def instance_to_dict(instance: Instance, task_id: ObjectId) -> dict: instance_mongo["_id"] = ObjectId() instance_mongo["done"] = False return instance_mongo - + async def get_node_id(self, address: str, service: str) -> str: node = await self.client.db[self.nodes_collection].find_one( {"address": address, "service": service} @@ -109,11 +109,12 @@ async def get_node_id(self, address: str, service: str) -> str: raise ApplicationError( f"Node address {address}, has no _id, cannot load tokenizer hash." ) - + return node["_id"] - - async def get_signature_hash(self, address: str, node_id: str, signature_name: str) -> str: + async def get_signature_hash( + self, address: str, node_id: str, signature_name: str + ) -> str: # Get the corresponding signature buffer buffer = await self.client.db[self.buffers_signatures_collection].find_one( { @@ -142,18 +143,17 @@ async def get_signature_hash(self, address: str, node_id: str, signature_name: s raise ApplicationError( f"Node address {address} buffer has no last signature field, entry is malformed cannot procede." ) - + return this_hash async def get_tokenizer_hash(self, address: str, service: str) -> str: - # Get node ID node_id = await self.get_node_id(address, service) # Get tokenizer signature hash tokenizer_hash = await self.get_signature_hash(address, node_id, "tokenizer") return tokenizer_hash - + async def get_config_hash(self, address: str, service: str) -> str: # Get node ID node_id = await self.get_node_id(address, service) @@ -382,7 +382,7 @@ async def reconstruct_instances( instance.prompt.data = CompletionRequest(**request_data) try: - r['response_time'] = ms + r["response_time"] = ms instance.resp = CompletionResponse(**r) except Exception as e: remove_doc_ids.add(i["doc_id"]) @@ -420,7 +420,7 @@ async def mark_task_to_drop(self, task_id: ObjectId): result_time=datetime.today().isoformat(), ), scores=[], - times=[] + times=[], ).model_dump(by_alias=True) async with self.client.start_transaction() as session: diff --git a/packages/python/protocol/protocol.py b/packages/python/protocol/protocol.py index 6d2fa43..4f08f29 100644 --- a/packages/python/protocol/protocol.py +++ b/packages/python/protocol/protocol.py @@ -242,7 +242,7 @@ class CompletionResponse(OpenAIBaseModel): model: str choices: List[CompletionResponseChoice] usage: UsageInfo - response_time: int # Total time to complete request (POKT Network) + response_time: int # Total time to complete request (POKT Network) ###########