From f680adcc5433fd12bcedfce3a2030ea065636f0b Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 24 Oct 2024 20:54:34 -0700 Subject: [PATCH 1/7] Make OwnedName fields public They are required to build snapshots on the IPA side --- ipa-metrics/src/key.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs index 8f01ea2f4..02229a11f 100644 --- a/ipa-metrics/src/key.rs +++ b/ipa-metrics/src/key.rs @@ -114,8 +114,8 @@ impl<'lv, const LABELS: usize> Name<'lv, LABELS> { /// This is the key inside metric stores which are simple hashmaps. #[derive(Debug, Clone, Eq)] pub struct OwnedName { - key: &'static str, - labels: [Option; 5], + pub key: &'static str, + pub labels: [Option; 5], } impl OwnedName { From ffbba6ade30de380d1b820bddbaca24df3f99397 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 24 Oct 2024 20:55:33 -0700 Subject: [PATCH 2/7] Provide a default hasher for hashing labels We don't need to import fast hasher everywhere with this change. Labels don't need collision resistance, so implementors of `LabelValue` trait can use it --- ipa-metrics/src/label.rs | 11 +++++++++++ ipa-metrics/src/lib.rs | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs index 27da2b116..86a1b11d1 100644 --- a/ipa-metrics/src/label.rs +++ b/ipa-metrics/src/label.rs @@ -3,8 +3,19 @@ use std::{ hash::{Hash, Hasher}, }; +use rustc_hash::FxHasher; + pub const MAX_LABELS: usize = 5; +/// Provides a fast, non-collision resistant implementation of [`Hasher`] +/// for label values. T +/// +/// [`Hasher`]: std::hash::Hasher +#[must_use] +pub fn label_hasher() -> impl Hasher { + FxHasher::default() +} + /// Dimension value (or label value) must be sendable to another thread /// and there must be a way to show it pub trait LabelValue: Display + Send { diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index f84f8dc1c..a637518c5 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -22,7 +22,7 @@ pub use controller::{ Status as ControllerStatus, }; pub use key::{MetricName, OwnedName, UniqueElements}; -pub use label::{Label, LabelValue}; +pub use label::{label_hasher, Label, LabelValue}; #[cfg(feature = "partitions")] pub use partitioned::{ CurrentThreadContext as CurrentThreadPartitionContext, Partition as MetricPartition, From d2bbbff263986099bc4d9bd3d00df313dbc81d1e Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 24 Oct 2024 20:56:14 -0700 Subject: [PATCH 3/7] Provide a method to get all counters from the store --- ipa-metrics/src/store.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs index 501b875a2..bd792ae6e 100644 --- a/ipa-metrics/src/store.rs +++ b/ipa-metrics/src/store.rs @@ -94,6 +94,13 @@ impl Store { pub fn is_empty(&self) -> bool { self.len() == 0 } + + /// Returns an iterator over the counters in the store. + /// + /// The iterator item is a tuple of the metric name and the counter value. + pub fn counters(&self) -> impl Iterator { + self.counters.iter().map(|(key, value)| (key, *value)) + } } pub struct CounterHandle<'a, const LABELS: usize> { From 2cbf4ed1fc359e619897dfd12b6ff9a21b269408 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 24 Oct 2024 20:56:46 -0700 Subject: [PATCH 4/7] Provide a method that shows whether the current thread is connected to the collector thread --- ipa-metrics/src/context.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index 938d4560b..7d19f2eb8 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -39,6 +39,11 @@ impl CurrentThreadContext { pub fn store_mut T, T>(f: F) -> T { METRICS_CTX.with_borrow_mut(|ctx| f(ctx.store_mut())) } + + #[must_use] + pub fn is_connected() -> bool { + METRICS_CTX.with_borrow(|ctx| ctx.tx.is_some()) + } } /// This context is used inside thread-local storage, From f3608f666c2698f7ac9241457911ea3ca5df8e76 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 24 Oct 2024 20:58:53 -0700 Subject: [PATCH 5/7] Allow customizing channel size between collector and producer thread I noticed in unit tests it gets very hard to get a correct snapshot after test finishes. Often test thread sends the data and moves on, but the collector thread hasn't processed the data yet. So any assertion on the metric fails occasionally. One way to deal with it is to block the sender until receiver processes its data. Obviously performance-wise it is not great, but it is ok for unit tests --- ipa-metrics/src/collector.rs | 27 +++++++++++++++++++++--- ipa-metrics/src/controller.rs | 12 +++++------ ipa-metrics/src/lib.rs | 39 ++++++++++++++++++++++++++++++----- 3 files changed, 64 insertions(+), 14 deletions(-) diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs index 4d5995af3..94022e340 100644 --- a/ipa-metrics/src/collector.rs +++ b/ipa-metrics/src/collector.rs @@ -111,7 +111,10 @@ mod tests { thread::{Scope, ScopedJoinHandle}, }; - use crate::{controller::Status, counter, install, install_new_thread, producer::Producer}; + use crate::{ + controller::Status, counter, install, install_new_thread, producer::Producer, + MetricChannelType, + }; struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); @@ -145,7 +148,7 @@ mod tests { #[test] fn start_stop() { - let (collector, producer, controller) = install(); + let (collector, producer, controller) = install(MetricChannelType::Unbounded); let handle = thread::spawn(|| { let store = collector.install().block_until_shutdown(); store.counter_val(counter!("foo")) @@ -165,7 +168,8 @@ mod tests { #[test] fn with_thread() { - let (producer, controller, handle) = install_new_thread().unwrap(); + let (producer, controller, handle) = + install_new_thread(MetricChannelType::Unbounded).unwrap(); thread::scope(move |s| { let s = s.metered(producer); s.spawn(|| counter!("baz", 4)); @@ -179,4 +183,21 @@ mod tests { handle.join().unwrap(); // Collector thread should be terminated by now } + + #[test] + fn with_thread_rendezvous() { + let (producer, controller, _handle) = + install_new_thread(MetricChannelType::Rendezvous).unwrap(); + let counter = thread::scope(move |s| { + let s = s.metered(producer); + s.spawn(|| counter!("foo", 3)).join().unwrap(); + s.spawn(|| counter!("foo", 5)).join().unwrap(); + // we don't need to check the status because producer threads are now + // blocked until the collector receives their stores. This means that + // the snapshot must be up to date by now. + controller.snapshot().unwrap().counter_val(counter!("foo")) + }); + + assert_eq!(8, counter); + } } diff --git a/ipa-metrics/src/controller.rs b/ipa-metrics/src/controller.rs index 265dacf45..52deed853 100644 --- a/ipa-metrics/src/controller.rs +++ b/ipa-metrics/src/controller.rs @@ -34,9 +34,9 @@ impl Controller { /// /// ## Example /// ```rust - /// use ipa_metrics::{install_new_thread, MetricsStore}; + /// use ipa_metrics::{install_new_thread, MetricChannelType, MetricsStore}; /// - /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// let (_, controller, _handle) = install_new_thread(MetricChannelType::Unbounded).unwrap(); /// let snapshot = controller.snapshot().unwrap(); /// println!("Current metrics: {snapshot:?}"); /// ``` @@ -60,9 +60,9 @@ impl Controller { /// /// ## Example /// ```rust - /// use ipa_metrics::{install_new_thread, MetricsStore}; + /// use ipa_metrics::{install_new_thread, MetricChannelType, MetricsStore}; /// - /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// let (_, controller, _handle) = install_new_thread(MetricChannelType::Unbounded).unwrap(); /// controller.stop().unwrap(); /// ``` pub fn stop(self) -> Result<(), String> { @@ -81,9 +81,9 @@ impl Controller { /// /// ## Example /// ```rust - /// use ipa_metrics::{install_new_thread, ControllerStatus}; + /// use ipa_metrics::{install_new_thread, ControllerStatus, MetricChannelType}; /// - /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// let (_, controller, _handle) = install_new_thread(MetricChannelType::Unbounded).unwrap(); /// let status = controller.status().unwrap(); /// println!("Collector status: {status:?}"); /// ``` diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index a637518c5..2449d41a3 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -32,6 +32,19 @@ pub use producer::Producer as MetricsProducer; #[cfg(not(feature = "partitions"))] pub use store::Store as MetricsStore; +/// Type of the communication channel between metric producers +/// and the collector. +#[derive(Copy, Clone)] +pub enum MetricChannelType { + /// Each send message must be paired with receive. Sends that + /// don't get a pair block the thread until collector processes + /// the request. This mode is suitable for unit tests where metric + /// consistency is important and gets more priority than availability. + Rendezvous, + /// Each channel between producer and collector gets unlimited capacity. + Unbounded, +} + /// Creates metric infrastructure that is ready to use /// in the application code. It consists a triple of /// [`MetricsCollector`], [`MetricsProducer`], and @@ -47,9 +60,19 @@ pub use store::Store as MetricsStore; /// A thread that owns the controller, can request current snapshot. /// For more information about API, see [`Command`]. /// -/// ## Example +/// The communication channel between producers and collector is configured +/// via `channel_type` parameter. See [`MetricChannelType`] for details +/// +/// ## Example 1 (Rendezvous channels) +/// ```rust +/// use ipa_metrics::MetricChannelType; +/// let (collector, producer, controller) = ipa_metrics::install(MetricChannelType::Rendezvous); +/// ``` +/// +/// ## Example 2 (unbounded) /// ```rust -/// let (collector, producer, controller) = ipa_metrics::install(); +/// use ipa_metrics::MetricChannelType; +/// let (collector, producer, controller) = ipa_metrics::install(MetricChannelType::Unbounded); /// ``` /// /// [`MetricsCollector`]: crate::MetricsCollector @@ -57,13 +80,18 @@ pub use store::Store as MetricsStore; /// [`MetricsCollectorController`]: crate::MetricsCollectorController /// [`Command`]: crate::ControllerCommand #[must_use] -pub fn install() -> ( +pub fn install( + channel_type: MetricChannelType, +) -> ( MetricsCollector, MetricsProducer, MetricsCollectorController, ) { let (command_tx, command_rx) = crossbeam_channel::unbounded(); - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = match channel_type { + MetricChannelType::Rendezvous => crossbeam_channel::bounded(0), + MetricChannelType::Unbounded => crossbeam_channel::unbounded(), + }; ( MetricsCollector { rx, @@ -80,8 +108,9 @@ pub fn install() -> ( /// ## Errors /// if thread cannot be started pub fn install_new_thread( + channel_type: MetricChannelType, ) -> io::Result<(MetricsProducer, MetricsCollectorController, JoinHandle<()>)> { - let (collector, producer, controller) = install(); + let (collector, producer, controller) = install(channel_type); let handle = std::thread::Builder::new() .name("metric-collector".to_string()) .spawn(|| { From 1bb477e76a8c76731ce1246e025f0edb449f8849 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 25 Oct 2024 10:49:32 -0700 Subject: [PATCH 6/7] Improve coverage --- ipa-metrics/src/context.rs | 15 ++++++++++++++- ipa-metrics/src/key.rs | 2 +- ipa-metrics/src/store.rs | 11 +++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index 7d19f2eb8..f166d610b 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -127,7 +127,7 @@ impl Drop for MetricsContext { mod tests { use std::thread; - use crate::MetricsContext; + use crate::{context::CurrentThreadContext, MetricsContext}; /// Each thread has its local store by default, and it is exclusive to it #[test] @@ -170,4 +170,17 @@ mod tests { drop(ctx); handle.join().unwrap(); } + + #[test] + fn is_connected() { + assert!(!CurrentThreadContext::is_connected()); + let (tx, rx) = crossbeam_channel::unbounded(); + + CurrentThreadContext::init(tx); + CurrentThreadContext::store_mut(|store| store.counter(counter!("foo")).inc(1)); + CurrentThreadContext::flush(); + + assert!(CurrentThreadContext::is_connected()); + assert_eq!(1, rx.recv().unwrap().counter_val(counter!("foo"))); + } } diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs index 02229a11f..36eb60a65 100644 --- a/ipa-metrics/src/key.rs +++ b/ipa-metrics/src/key.rs @@ -203,7 +203,7 @@ impl Hash for OwnedName { #[cfg(test)] pub fn compute_hash(value: V) -> u64 { - let mut hasher = std::hash::DefaultHasher::default(); + let mut hasher = crate::label_hasher(); value.hash(&mut hasher); hasher.finish() diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs index bd792ae6e..e893ffd84 100644 --- a/ipa-metrics/src/store.rs +++ b/ipa-metrics/src/store.rs @@ -229,4 +229,15 @@ mod tests { store.counter(counter!("bar")).inc(1); assert_eq!(2, store.len()); } + + #[test] + fn counters() { + let mut store = Store::default(); + store.counter(counter!("foo")).inc(1); + store.counter(counter!("foo", "h1" => &1)).inc(1); + store.counter(counter!("foo", "h2" => &2)).inc(1); + store.counter(counter!("bar")).inc(1); + + assert_eq!((4, Some(4)), store.counters().size_hint()); + } } From fa98432d0828b6465d12c504b87135f4454eb646 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 25 Oct 2024 18:18:15 -0700 Subject: [PATCH 7/7] Feedback --- ipa-metrics/src/key.rs | 2 +- ipa-metrics/src/label.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs index 36eb60a65..620e193e3 100644 --- a/ipa-metrics/src/key.rs +++ b/ipa-metrics/src/key.rs @@ -115,7 +115,7 @@ impl<'lv, const LABELS: usize> Name<'lv, LABELS> { #[derive(Debug, Clone, Eq)] pub struct OwnedName { pub key: &'static str, - pub labels: [Option; 5], + labels: [Option; 5], } impl OwnedName { diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs index 86a1b11d1..dd822be86 100644 --- a/ipa-metrics/src/label.rs +++ b/ipa-metrics/src/label.rs @@ -8,7 +8,7 @@ use rustc_hash::FxHasher; pub const MAX_LABELS: usize = 5; /// Provides a fast, non-collision resistant implementation of [`Hasher`] -/// for label values. T +/// for label values. /// /// [`Hasher`]: std::hash::Hasher #[must_use]