Skip to content

Commit

Permalink
Fix client-side metrics processing (#200)
Browse files Browse the repository at this point in the history
* Add more robust client-side metrics helper

* Improve client-side metrics processing
  • Loading branch information
geoffxy authored Jul 19, 2023
1 parent 370fa21 commit 80e4bec
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 33 deletions.
5 changes: 2 additions & 3 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ async def _read_server_messages(self) -> None:

elif isinstance(message, MetricsReport):
logger.debug(
"Received metrics report. Txn value: %d, Elapsed time: %.2f",
message.txn_end_value,
message.elapsed_time_s,
"Received metrics report. txn_completions_per_s: %.2f",
message.txn_completions_per_s,
)
self._monitor.handle_metric_report(message)

Expand Down
6 changes: 3 additions & 3 deletions src/brad/daemon/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class MetricsReport:
Sent from the server to the daemon to report BRAD's client-side metrics.
"""

def __init__(self, txn_end_value: int, elapsed_time_s: float) -> None:
self.txn_end_value = txn_end_value
self.elapsed_time_s = elapsed_time_s
def __init__(self, txn_completions_per_s: float) -> None:
# We will need to include a "worker id" once we have multiple front ends.
self.txn_completions_per_s = txn_completions_per_s


class ShutdownDaemon:
Expand Down
38 changes: 15 additions & 23 deletions src/brad/daemon/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import asyncio
import numpy as np
import pytz
import time
import logging
from importlib.resources import files, as_file
from typing import List, Dict
Expand All @@ -19,7 +18,7 @@
from brad.forecasting.moving_average_forecaster import MovingAverageForecaster
from brad.forecasting.linear_forecaster import LinearForecaster
from brad.forecasting import Forecaster
from brad.utils.counter import Counter
from brad.utils.streaming_metric import StreamingMetric

logger = logging.getLogger(__name__)

Expand All @@ -32,7 +31,7 @@ def get_metric_id(engine: str, metric_name: str, stat: str, role: str = ""):
return metric_id


FrontEndMetricDict = Dict[FrontEndMetric, int | float]
FrontEndMetricDict = Dict[FrontEndMetric, StreamingMetric]


# Monitor
Expand Down Expand Up @@ -75,9 +74,8 @@ def __init__(

# These are BRAD-defined metrics that are collected by the front end
# servers.
self._txn_end_counter = Counter()
self._front_end_metrics: FrontEndMetricDict = {
FrontEndMetric.TxnEndPerSecond: 0.0,
FrontEndMetric.TxnEndPerSecond: StreamingMetric[float](),
}

# Forcibly read metrics. Use to avoid `run_forever()`.
Expand All @@ -103,31 +101,25 @@ def from_schema_name(cls, schema_name: str):
async def run_forever(self) -> None:
# Flesh out the monitor - maintain running averages of the underlying
# engines' metrics.
period_start = time.time()
while True:
elapsed_time_s = time.time() - period_start
self._update_front_end_metrics(elapsed_time_s)
self._update_front_end_metrics()
self._add_metrics()
period_start = time.time()
await asyncio.sleep(self._epoch_length.total_seconds()) # Read every epoch

def handle_metric_report(self, report: MetricsReport) -> None:
self._txn_end_counter.bump(report.txn_end_value)

def _update_front_end_metrics(self, elapsed_time_s: float) -> None:
if elapsed_time_s > 0.0:
self._front_end_metrics[FrontEndMetric.TxnEndPerSecond] = (
self._txn_end_counter.value() / elapsed_time_s
)
else:
self._front_end_metrics[FrontEndMetric.TxnEndPerSecond] = 0.0

self._txn_end_counter.reset()
now = datetime.now(tz=timezone.utc)
self._front_end_metrics[FrontEndMetric.TxnEndPerSecond].add_sample(
report.txn_completions_per_s, now
)

# TODO: Switch these to debug after we set up client metrics ingestion.
logger.info("Updated front end metrics:")
def _update_front_end_metrics(self) -> None:
# TODO: This is just logged for debug purposes. We probably do not need
# this method.
now = datetime.now(tz=timezone.utc)
window_start = now - self._epoch_length
logger.info("Current front end metrics:")
for metric, value in self._front_end_metrics.items():
logger.info("%s: %.2f", metric, value)
logger.info("%s: %.2f", metric, value.average_since(window_start))

############
# The following functions, prefixed by `read_`, provide different ways to query the monitor for
Expand Down
7 changes: 3 additions & 4 deletions src/brad/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,10 @@ async def _report_metrics_to_daemon(self) -> None:
elapsed_time_s = period_end - period_start

# If the input queue is full, we just drop this message.
metrics_report = MetricsReport(txn_value, elapsed_time_s)
sampled_thpt = txn_value / elapsed_time_s
metrics_report = MetricsReport(sampled_thpt)
logger.debug(
"Sending metrics report: txn_end_count: %d, elapsed_time_s: %.2f",
txn_value,
elapsed_time_s,
"Sending metrics report: txn_completions_per_s: %.2f", sampled_thpt
)
assert self._daemon_input_queue is not None
self._daemon_input_queue.put_nowait(metrics_report)
Expand Down
46 changes: 46 additions & 0 deletions src/brad/utils/streaming_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime
from collections import deque
from typing import Deque, Tuple, TypeVar, Generic

T = TypeVar("T", int, float)


class StreamingMetric(Generic[T]):
"""
A simple wrapper around a time-series metric. The assumption is that this
metric is updated at some regular frequency.
"""

def __init__(self, window_size: int = 10) -> None:
self._metric_data: Deque[Tuple[T, datetime]] = deque()
self._window_size = window_size

def add_sample(self, value: T, timestamp: datetime) -> None:
self._metric_data.append((value, timestamp))
self._trim_metric_data()

def average_since(self, timestamp: datetime) -> float:
if len(self._metric_data) == 0:
return 0.0

# Assumption is that `metric_data` is sorted in ascending timestamp order.
total = None
num_samples = 0
for value, val_timestamp in reversed(self._metric_data):
if total is None:
total = value
else:
total += value
num_samples += 1

if val_timestamp <= timestamp:
# We want to add the first value with a timestamp less than or
# equal to the given timestamp.
break

assert total is not None
return total / num_samples

def _trim_metric_data(self) -> None:
while len(self._metric_data) > self._window_size:
self._metric_data.popleft()
46 changes: 46 additions & 0 deletions tests/test_streaming_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime, timedelta
from typing import List, Tuple

from brad.utils.streaming_metric import StreamingMetric


def get_value_stream(start: datetime) -> List[Tuple[float, datetime]]:
return [
(3.0, start),
(10.0, start + timedelta(seconds=30)),
(20.0, start + timedelta(seconds=60)),
]


def test_empty():
start = datetime(year=2023, month=7, day=19)
sm = StreamingMetric[float]()
val = sm.average_since(start)
assert val == 0.0


def test_multiple():
start = datetime(year=2023, month=7, day=19)
sm = StreamingMetric[float]()
for val, ts in get_value_stream(start):
sm.add_sample(val, ts)
val = sm.average_since(start + timedelta(seconds=45))
# Average of 10.0 and 20.0
assert val == 15.0


def test_all():
start = datetime(year=2023, month=7, day=19)
sm = StreamingMetric[float]()
for val, ts in get_value_stream(start):
sm.add_sample(val, ts)

val = sm.average_since(start)
# Average of 3.0, 10.0, and 20.0
assert val == 11.0

val = sm.average_since(start - timedelta(days=1))
assert val == 11.0

val = sm.average_since(start + timedelta(seconds=10))
assert val == 11.0

0 comments on commit 80e4bec

Please sign in to comment.