Skip to content

Commit

Permalink
Merge pull request private-attribution#1374 from akoshelev/metrics-bo…
Browse files Browse the repository at this point in the history
…unded-channel

Metrics ergonomic and unit test improvements
  • Loading branch information
akoshelev authored Oct 26, 2024
2 parents 6b81609 + fa98432 commit 57e2c63
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 18 deletions.
27 changes: 24 additions & 3 deletions ipa-metrics/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ mod tests {
thread::{Scope, ScopedJoinHandle},
};

use crate::{controller::Status, counter, install, install_new_thread, producer::Producer};
use crate::{
controller::Status, counter, install, install_new_thread, producer::Producer,
MetricChannelType,
};

struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer);

Expand Down Expand Up @@ -145,7 +148,7 @@ mod tests {

#[test]
fn start_stop() {
let (collector, producer, controller) = install();
let (collector, producer, controller) = install(MetricChannelType::Unbounded);
let handle = thread::spawn(|| {
let store = collector.install().block_until_shutdown();
store.counter_val(counter!("foo"))
Expand All @@ -165,7 +168,8 @@ mod tests {

#[test]
fn with_thread() {
let (producer, controller, handle) = install_new_thread().unwrap();
let (producer, controller, handle) =
install_new_thread(MetricChannelType::Unbounded).unwrap();
thread::scope(move |s| {
let s = s.metered(producer);
s.spawn(|| counter!("baz", 4));
Expand All @@ -179,4 +183,21 @@ mod tests {

handle.join().unwrap(); // Collector thread should be terminated by now
}

#[test]
fn with_thread_rendezvous() {
let (producer, controller, _handle) =
install_new_thread(MetricChannelType::Rendezvous).unwrap();
let counter = thread::scope(move |s| {
let s = s.metered(producer);
s.spawn(|| counter!("foo", 3)).join().unwrap();
s.spawn(|| counter!("foo", 5)).join().unwrap();
// we don't need to check the status because producer threads are now
// blocked until the collector receives their stores. This means that
// the snapshot must be up to date by now.
controller.snapshot().unwrap().counter_val(counter!("foo"))
});

assert_eq!(8, counter);
}
}
20 changes: 19 additions & 1 deletion ipa-metrics/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl CurrentThreadContext {
pub fn store_mut<F: FnOnce(&mut MetricsStore) -> T, T>(f: F) -> T {
METRICS_CTX.with_borrow_mut(|ctx| f(ctx.store_mut()))
}

#[must_use]
pub fn is_connected() -> bool {
METRICS_CTX.with_borrow(|ctx| ctx.tx.is_some())
}
}

/// This context is used inside thread-local storage,
Expand Down Expand Up @@ -122,7 +127,7 @@ impl Drop for MetricsContext {
mod tests {
use std::thread;

use crate::MetricsContext;
use crate::{context::CurrentThreadContext, MetricsContext};

/// Each thread has its local store by default, and it is exclusive to it
#[test]
Expand Down Expand Up @@ -165,4 +170,17 @@ mod tests {
drop(ctx);
handle.join().unwrap();
}

#[test]
fn is_connected() {
assert!(!CurrentThreadContext::is_connected());
let (tx, rx) = crossbeam_channel::unbounded();

CurrentThreadContext::init(tx);
CurrentThreadContext::store_mut(|store| store.counter(counter!("foo")).inc(1));
CurrentThreadContext::flush();

assert!(CurrentThreadContext::is_connected());
assert_eq!(1, rx.recv().unwrap().counter_val(counter!("foo")));
}
}
12 changes: 6 additions & 6 deletions ipa-metrics/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ impl Controller {
///
/// ## Example
/// ```rust
/// use ipa_metrics::{install_new_thread, MetricsStore};
/// use ipa_metrics::{install_new_thread, MetricChannelType, MetricsStore};
///
/// let (_, controller, _handle) = install_new_thread().unwrap();
/// let (_, controller, _handle) = install_new_thread(MetricChannelType::Unbounded).unwrap();
/// let snapshot = controller.snapshot().unwrap();
/// println!("Current metrics: {snapshot:?}");
/// ```
Expand All @@ -60,9 +60,9 @@ impl Controller {
///
/// ## Example
/// ```rust
/// use ipa_metrics::{install_new_thread, MetricsStore};
/// use ipa_metrics::{install_new_thread, MetricChannelType, MetricsStore};
///
/// let (_, controller, _handle) = install_new_thread().unwrap();
/// let (_, controller, _handle) = install_new_thread(MetricChannelType::Unbounded).unwrap();
/// controller.stop().unwrap();
/// ```
pub fn stop(self) -> Result<(), String> {
Expand All @@ -81,9 +81,9 @@ impl Controller {
///
/// ## Example
/// ```rust
/// use ipa_metrics::{install_new_thread, ControllerStatus};
/// use ipa_metrics::{install_new_thread, ControllerStatus, MetricChannelType};
///
/// let (_, controller, _handle) = install_new_thread().unwrap();
/// let (_, controller, _handle) = install_new_thread(MetricChannelType::Unbounded).unwrap();
/// let status = controller.status().unwrap();
/// println!("Collector status: {status:?}");
/// ```
Expand Down
4 changes: 2 additions & 2 deletions ipa-metrics/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl<'lv, const LABELS: usize> Name<'lv, LABELS> {
/// This is the key inside metric stores which are simple hashmaps.
#[derive(Debug, Clone, Eq)]
pub struct OwnedName {
key: &'static str,
pub key: &'static str,
labels: [Option<OwnedLabel>; 5],
}

Expand Down Expand Up @@ -203,7 +203,7 @@ impl Hash for OwnedName {

#[cfg(test)]
pub fn compute_hash<V: Hash>(value: V) -> u64 {
let mut hasher = std::hash::DefaultHasher::default();
let mut hasher = crate::label_hasher();
value.hash(&mut hasher);

hasher.finish()
Expand Down
11 changes: 11 additions & 0 deletions ipa-metrics/src/label.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,19 @@ use std::{
hash::{Hash, Hasher},
};

use rustc_hash::FxHasher;

pub const MAX_LABELS: usize = 5;

/// Provides a fast, non-collision resistant implementation of [`Hasher`]
/// for label values.
///
/// [`Hasher`]: std::hash::Hasher
#[must_use]
pub fn label_hasher() -> impl Hasher {
FxHasher::default()
}

/// Dimension value (or label value) must be sendable to another thread
/// and there must be a way to show it
pub trait LabelValue: Display + Send {
Expand Down
41 changes: 35 additions & 6 deletions ipa-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use controller::{
Status as ControllerStatus,
};
pub use key::{MetricName, OwnedName, UniqueElements};
pub use label::{Label, LabelValue};
pub use label::{label_hasher, Label, LabelValue};
#[cfg(feature = "partitions")]
pub use partitioned::{
CurrentThreadContext as CurrentThreadPartitionContext, Partition as MetricPartition,
Expand All @@ -32,6 +32,19 @@ pub use producer::Producer as MetricsProducer;
#[cfg(not(feature = "partitions"))]
pub use store::Store as MetricsStore;

/// Type of the communication channel between metric producers
/// and the collector.
#[derive(Copy, Clone)]
pub enum MetricChannelType {
/// Each send message must be paired with receive. Sends that
/// don't get a pair block the thread until collector processes
/// the request. This mode is suitable for unit tests where metric
/// consistency is important and gets more priority than availability.
Rendezvous,
/// Each channel between producer and collector gets unlimited capacity.
Unbounded,
}

/// Creates metric infrastructure that is ready to use
/// in the application code. It consists a triple of
/// [`MetricsCollector`], [`MetricsProducer`], and
Expand All @@ -47,23 +60,38 @@ pub use store::Store as MetricsStore;
/// A thread that owns the controller, can request current snapshot.
/// For more information about API, see [`Command`].
///
/// ## Example
/// The communication channel between producers and collector is configured
/// via `channel_type` parameter. See [`MetricChannelType`] for details
///
/// ## Example 1 (Rendezvous channels)
/// ```rust
/// use ipa_metrics::MetricChannelType;
/// let (collector, producer, controller) = ipa_metrics::install(MetricChannelType::Rendezvous);
/// ```
///
/// ## Example 2 (unbounded)
/// ```rust
/// let (collector, producer, controller) = ipa_metrics::install();
/// use ipa_metrics::MetricChannelType;
/// let (collector, producer, controller) = ipa_metrics::install(MetricChannelType::Unbounded);
/// ```
///
/// [`MetricsCollector`]: crate::MetricsCollector
/// [`MetricsProducer`]: crate::MetricsProducer
/// [`MetricsCollectorController`]: crate::MetricsCollectorController
/// [`Command`]: crate::ControllerCommand
#[must_use]
pub fn install() -> (
pub fn install(
channel_type: MetricChannelType,
) -> (
MetricsCollector,
MetricsProducer,
MetricsCollectorController,
) {
let (command_tx, command_rx) = crossbeam_channel::unbounded();
let (tx, rx) = crossbeam_channel::unbounded();
let (tx, rx) = match channel_type {
MetricChannelType::Rendezvous => crossbeam_channel::bounded(0),
MetricChannelType::Unbounded => crossbeam_channel::unbounded(),
};
(
MetricsCollector {
rx,
Expand All @@ -80,8 +108,9 @@ pub fn install() -> (
/// ## Errors
/// if thread cannot be started
pub fn install_new_thread(
channel_type: MetricChannelType,
) -> io::Result<(MetricsProducer, MetricsCollectorController, JoinHandle<()>)> {
let (collector, producer, controller) = install();
let (collector, producer, controller) = install(channel_type);
let handle = std::thread::Builder::new()
.name("metric-collector".to_string())
.spawn(|| {
Expand Down
18 changes: 18 additions & 0 deletions ipa-metrics/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ impl Store {
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Returns an iterator over the counters in the store.
///
/// The iterator item is a tuple of the metric name and the counter value.
pub fn counters(&self) -> impl Iterator<Item = (&OwnedMetricName, CounterValue)> {
self.counters.iter().map(|(key, value)| (key, *value))
}
}

pub struct CounterHandle<'a, const LABELS: usize> {
Expand Down Expand Up @@ -222,4 +229,15 @@ mod tests {
store.counter(counter!("bar")).inc(1);
assert_eq!(2, store.len());
}

#[test]
fn counters() {
let mut store = Store::default();
store.counter(counter!("foo")).inc(1);
store.counter(counter!("foo", "h1" => &1)).inc(1);
store.counter(counter!("foo", "h2" => &2)).inc(1);
store.counter(counter!("bar")).inc(1);

assert_eq!((4, Some(4)), store.counters().size_hint());
}
}

0 comments on commit 57e2c63

Please sign in to comment.