Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

71 manager evaluator add tracking of requests response times #100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/go/manager/activities/process_node_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func retrieveTaskData(taskID primitive.ObjectID,
cursor := tasksCollection.FindOne(ctxM, task_request_filter, opts)
var taskReq types.TaskRequestRecord
if err := cursor.Decode(&taskReq); err != nil {
l.Error().Msg("Could not decode task request data from MongoDB.")
l.Error().Str("taskID", taskID.String()).Msg("Could not decode task request data from MongoDB.")
return taskReq, err
}

Expand Down
5 changes: 3 additions & 2 deletions apps/go/manager/records/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type ResultInterface interface {
// Record written by the evaluator.
// The NumericalResultRecord field indicates how many samples were actually calculated
type NumericalResultRecord struct {
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []ScoresSample `bson:"scores"`
ResultData BaseResultRecord `bson:"result_data"`
ScoresSamples []ScoresSample `bson:"scores"`
ComputeTimesSamples []float32 `bson:"times"`
}

func (record *NumericalResultRecord) GetResultTime() time.Time {
Expand Down
50 changes: 40 additions & 10 deletions apps/go/manager/records/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,20 @@ type NumericalTaskRecord struct {
MeanScore float32 `bson:"mean_scores"`
MedianScore float32 `bson:"median_scores"`
StdScore float32 `bson:"std_scores"`
// Times
MeanProcessTime float32 `bson:"mean_times"`
MedianProcessTime float32 `bson:"median_times"`
StdProcessTime float32 `bson:"std_times"`
// buffer
ScoresSamples []ScoresSample `bson:"scores"`
// circular buffer control
CircBuffer types.CircularBuffer `bson:"circ_buffer_control"`
}

type ScoresSample struct {
Score float64 `bson:"score"`
ID int `bson:"id"`
Score float64 `bson:"score"`
ID int `bson:"id"`
RunTime float32 `bson:"run_time"`
}

func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework string, task string, date time.Time, l *zerolog.Logger) {
Expand All @@ -384,8 +389,13 @@ func (record *NumericalTaskRecord) NewTask(nodeID primitive.ObjectID, framework
record.TaskData.LastSeen = date

record.MeanScore = 0.0
record.MedianScore = 0.0
record.StdScore = 0.0
record.MeanProcessTime = 0.0
record.MedianProcessTime = 0.0
record.StdProcessTime = 0.0
record.ScoresSamples = make([]ScoresSample, bufferLen)

record.CircBuffer = types.CircularBuffer{
CircBufferLen: bufferLen,
NumSamples: 0,
Expand Down Expand Up @@ -520,32 +530,51 @@ func (record *NumericalTaskRecord) ProcessData(l *zerolog.Logger) (err error) {
}

// Slice the buffer and cast
var auxData []float64
var auxDataScores []float64
var auxDataTimes []float64
for _, sampleId := range validIdx {
// Add sample to data array
auxData = append(auxData, float64(record.ScoresSamples[sampleId].Score))
auxDataScores = append(auxDataScores, float64(record.ScoresSamples[sampleId].Score))
auxDataTimes = append(auxDataTimes, float64(record.ScoresSamples[sampleId].RunTime))
}

length := len(auxData)
length := len(auxDataScores)
if length == 0 {
record.MeanScore = 0
record.StdScore = 0
record.MedianScore = 0
record.MeanProcessTime = 0
record.StdProcessTime = 0
record.MedianProcessTime = 0

} else if length == 1 {
record.MeanScore = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].Score)
record.StdScore = 0
record.MedianScore = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].Score)
record.MeanProcessTime = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].RunTime)
record.StdProcessTime = 0
record.MedianProcessTime = float32(record.ScoresSamples[record.CircBuffer.Indexes.Start].RunTime)
} else {
// Calculate the mean
record.MeanScore = float32(stat.Mean(auxData, nil))
record.MeanScore = float32(stat.Mean(auxDataScores, nil))
// Calculate the standard deviation
record.StdScore = float32(stat.StdDev(auxData, nil))
record.StdScore = float32(stat.StdDev(auxDataScores, nil))
// Calculate the median
sort.Float64s(auxData)
sort.Float64s(auxDataScores)
if length%2 == 0 {
record.MedianScore = float32((auxDataScores[length/2-1] + auxDataScores[length/2]) / 2)
} else {
record.MedianScore = float32(auxDataScores[length/2])
}

// Same for times
record.MeanProcessTime = float32(stat.Mean(auxDataTimes, nil))
record.StdProcessTime = float32(stat.StdDev(auxDataTimes, nil))
sort.Float64s(auxDataTimes)
if length%2 == 0 {
record.MedianScore = float32((auxData[length/2-1] + auxData[length/2]) / 2)
record.MedianProcessTime = float32((auxDataTimes[length/2-1] + auxDataTimes[length/2]) / 2)
} else {
record.MedianScore = float32(auxData[length/2])
record.MedianProcessTime = float32(auxDataTimes[length/2])
}
}
return err
Expand All @@ -572,6 +601,7 @@ func (record *NumericalTaskRecord) InsertSample(timeSample time.Time, data inter
// Save sample
record.ScoresSamples[record.CircBuffer.Indexes.End].Score = dataOk.Score
record.ScoresSamples[record.CircBuffer.Indexes.End].ID = dataOk.ID
record.ScoresSamples[record.CircBuffer.Indexes.End].RunTime = dataOk.RunTime
record.CircBuffer.Times[record.CircBuffer.Indexes.End] = timeSample

return nil
Expand Down
2 changes: 1 addition & 1 deletion apps/go/manager/types/node_task_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type TaskRequestRecord struct {
Blacklist []int `bson:"blacklist"`
Qty int `bson:"qty"`
TotalInstances int `bson:"total_instances"`
RequestType string `bson:"string"`
RequestType string `bson:"request_type"`
Done bool `bson:"done"`
}

Expand Down
4 changes: 4 additions & 0 deletions apps/python/api/app/basemodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
"last_seen": "$task_data.last_seen",
"last_height": "$task_data.last_height",
"mean": "$mean_scores",
"median": "$median_scores",
"std": "$std_scores",
"num": "$circ_buffer_control.num_samples",
"mean_times": "$mean_times",
"median_times": "$median_times",
"std_times": "$std_times",
}
},
]
Expand Down
45 changes: 39 additions & 6 deletions apps/python/api/app/leaderboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,23 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
leaderboard_entry = dict()
leaderboard_entry["status"] = "OK"

# Add QoS
leaderboard_entry["qos"] = {
"error_rate": (np.random.random(1)[0] * 0.1),
"response_time": (np.random.random(1)[0] * 1000),
}

# Add Metrics
leaderboard_entry["metrics"] = dict()
running_mean_avg = 0
weight_avg = 0
std_err_avg = 0
incomplete = False
running_mean_time_avg = 0
std_err_time_avg = 0
for metric in LEADERBOARD_METRICS.keys():
metric_name = LEADERBOARD_METRICS[metric]

if metric == "mmlu":
running_mean_mmlu = 0
weight_mmlu = 0
std_err_mmlu = 0
running_mean_time_mmlu = 0
std_err_time_mmlu = 0
# This requiere more steps yay!
all_ok = True
partial = False
Expand All @@ -161,15 +159,26 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
if data_row["num"].values[0] <= 50:
# This is a partial metric
partial = True
metric_time_mean = (
data_row["mean_times"].values[0]
* data_row["num"].values[0]
)
metric_time_std_err = data_row["std_times"].values[
0
] / np.sqrt(data_row["num"].values[0])
else:
metric_mean = 0
metric_std_err = 0
metric_time_mean = 0
metric_time_std_err = 0

this_w = data_row["num"].values[0]
running_mean_mmlu += metric_mean
running_mean_time_mmlu += metric_time_mean
weight_mmlu += this_w
if this_w > 0:
std_err_mmlu += (metric_std_err / this_w) ** 2
std_err_time_mmlu += (metric_time_std_err / this_w) ** 2

if all_ok:
if weight_mmlu == 0:
Expand All @@ -182,11 +191,15 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
metric_mean = running_mean_mmlu
metric_std_err = std_err_mmlu
metric_weight = weight_mmlu / len(LEADERBOARD_MMLU_METRICS)
metric_time_mean = running_mean_time_mmlu
metric_time_std_err = std_err_time_mmlu
else:
# No data
metric_mean = np.nan
metric_std_err = np.nan
metric_weight = np.nan
metric_time_mean = np.nan
metric_time_std_err = np.nan

else:
data_row = scores_df.loc[
Expand All @@ -200,6 +213,8 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
metric_mean = np.nan
metric_std_err = np.nan
metric_weight = np.nan
metric_time_mean = np.nan
metric_time_std_err = np.nan

elif data_row["num"].values[0] > 0:
metric_mean = data_row["mean"].values[0]
Expand All @@ -209,10 +224,17 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
metric_weight = data_row["num"].values[0]
if data_row["num"].values[0] <= 50:
partial = True

metric_time_mean = data_row["mean_times"].values[0]
metric_time_std_err = data_row["std_times"].values[0] / np.sqrt(
data_row["num"].values[0]
)
else:
metric_mean = 0
metric_std_err = 0
metric_weight = 0
metric_time_mean = 0
metric_time_std_err = 0

if np.isnan(metric_mean) or metric_weight == 0:
leaderboard_entry["metrics"][metric_name] = {
Expand All @@ -230,11 +252,16 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
running_mean_avg += metric_mean * metric_weight
weight_avg += metric_weight
std_err_avg += metric_std_err**2
# track times
running_mean_time_avg += metric_time_mean * metric_weight
std_err_time_avg += metric_time_std_err**2

if weight_avg == 0:
running_mean_avg = 0
running_mean_time_avg = 0
else:
running_mean_avg = running_mean_avg / weight_avg
running_mean_time_avg = running_mean_time_avg / weight_avg
if std_err_avg != 0:
std_err_avg = np.sqrt(std_err_avg)

Expand All @@ -244,6 +271,12 @@ async def get_leaderboard_full(mongodb: PoktMongodb) -> Tuple[dict, bool]:
"status": "OK" if not incomplete else "INCOMPLETE",
}

# Add QoS
leaderboard_entry["qos"] = {
"error_rate": 0.0,
"response_time": running_mean_time_avg,
}

# Add Metadata
leaderboard_entry["metadata"] = {
"service": str(node_df["service"].values[0]),
Expand Down
6 changes: 3 additions & 3 deletions apps/python/sampler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ Files that follow the structure of `lm-eval-harness`. The intention, for instanc
* `PocketNetworkTaskManager`: A class based on `TaskManager`, that is used to inject `pocket_args` into the `task.config.metadata`.

**api**
* `PocketNetworkConfigurableTask`: A class based on `ConfigurableTask`, that retrieve samples from the sql database, based on `blacklist` id's & `uri` previously defined in `pocket_args`. In `PocketNetworkConfigurableTask.download` validations reagrding `training_split`, `validation_split`, `test_split` and `fewshot_split` are followed as pointed in the `lm-eval-harness- documentation.
* `PocketNetworkConfigurableTask`: A class based on `ConfigurableTask`, that retrieve samples from the sql database, based on `blacklist` id's & `uri` previously defined in `pocket_args`. In `PocketNetworkConfigurableTask.download` validations regarding `training_split`, `validation_split`, `test_split` and `fewshot_split` are followed as pointed in the `lm-eval-harness- documentation.

* `def build_all_requests` was modified in order to inyecet the postgres document id into the `Instance.doc_id`.
* `def build_all_requests` was modified in order to inject the Postgres document id into the `Instance.doc_id`.

**models**
* `PocketNetworkLM`: A class that mimic partially `OpenaiChatCompletionsLM` from `lm-eval-harness`. Instead on generate request and take respobnses, both `loglikelihood` and `generate_until` methods instantiate `CompletionRequest`. The last is a class used as a proxy to generate the `data` field of a RPC request that is saved in Mongo.
Expand All @@ -39,7 +39,7 @@ Files that follow the structure of `lm-eval-harness`. The intention, for instanc
* `get_configurable_task`: A function to return only `ConfigurableTask` based on the `task_manager`.
* If `task_manager` is `TaskManager`, then all samples from all splits are part of the dataset.
* If `task_manager` is `PocketNetworkTaskManager`, random samples are generated based on the configuration split and the blacklist provided in `pocket_args`.
* `genererate_requests`: A functions that hierarchically save in MongoDB the strucutre of `Task`->`Instances`->`Prompts`.
* `genererate_requests`: A functions that hierarchically save in MongoDB the structure of `Task`->`Instances`->`Prompts`.


### Accessing the DB with PG Admin
Expand Down
3 changes: 3 additions & 0 deletions packages/python/lmeh/pocket_lm_eval/models/pocket_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,5 +664,8 @@ def loglikelihood(

return self._loglikelihood_tokens(new_reqs, disable_tqdm=disable_tqdm)

def response_times(self, requests, disable_tqdm: bool = True) -> List[int]:
return [req.resp.response_time for req in requests]

def _encode_pair(self, context_enc, continuation_enc):
return context_enc, continuation_enc
17 changes: 13 additions & 4 deletions packages/python/lmeh/utils/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
simple_parse_args_string,
)

import numpy as np

if TYPE_CHECKING:
from lm_eval.api.model import LM
from lm_eval.tasks import Task
Expand Down Expand Up @@ -510,13 +512,16 @@ async def save_results(
cloned_reqs = []
for req in reqs:
cloned_reqs.extend([req] * req.repeats)

req.times = []
# run requests through model
resps = getattr(lm, reqtype)(cloned_reqs)
# Get times POKT Network
times = getattr(lm, "response_times")(cloned_reqs)

# put responses from model into a list of length K for each request.
for x, req in zip(resps, cloned_reqs):
for x, t, req in zip(resps, times, cloned_reqs):
req.resps.append(x)
req.times.append(t)

RANK = lm.rank
WORLD_SIZE = lm.world_size
Expand All @@ -539,6 +544,7 @@ async def save_results(
instances.sort(key=lambda x: x.idx)
# iterate over different filters used
scores = []
times = []
result_num_samples = set()
for filter_key in task.instances[0].filtered_resps.keys():
if filter_key not in selected_filters:
Expand All @@ -555,6 +561,7 @@ async def save_results(
metrics = task.process_results(
doc, [req.filtered_resps[filter_key] for req in requests]
)
response_times = [np.mean(req.times).astype(float) for req in requests]
if log_samples:
target = task.doc_to_target(doc)
example = {
Expand All @@ -579,10 +586,12 @@ async def save_results(
}
example.update(metrics)
task_output.logged_samples.append(example)
for metric, value in metrics.items():
for (metric, value), ms in zip(metrics.items(), response_times):
task_output.sample_metrics[(metric, filter_key)].append(value)
if metric in selected_metrics:
numericSample = NumericSample(score=example[metric], id=doc_id)
numericSample = NumericSample(
score=example[metric], run_time=ms, id=doc_id
)
scores.append(numericSample)

base_result = PocketNetworkMongoDBResultBase(
Expand Down
Loading
Loading