From 240cdea922488d6b97b235545b5d19ee135b7b20 Mon Sep 17 00:00:00 2001 From: upupnoah Date: Thu, 11 Jul 2024 04:32:55 +0700 Subject: [PATCH] feat(amap): support atomic map for metrics --- README.md | 13 +++ examples/ametrics.rs | 71 ++++++++++++++++ examples/{metrics.rs => cmetrics.rs} | 8 +- src/metrics.rs | 122 +-------------------------- src/metrics/amap.rs | 66 +++++++++++++++ src/metrics/cmap.rs | 119 ++++++++++++++++++++++++++ 6 files changed, 277 insertions(+), 122 deletions(-) create mode 100644 examples/ametrics.rs rename examples/{metrics.rs => cmetrics.rs} (92%) create mode 100644 src/metrics/amap.rs create mode 100644 src/metrics/cmap.rs diff --git a/README.md b/README.md index a894c5a..d832b09 100644 --- a/README.md +++ b/README.md @@ -1 +1,14 @@ # rust-concurrency + +## matrix + +- 矩阵乘法 +- 并发矩阵乘法 + +## cmap metrics(指标监测) + +| 并发 map + +## amap metrics(指标监测) + +| atomic map diff --git a/examples/ametrics.rs b/examples/ametrics.rs new file mode 100644 index 0000000..33c3c42 --- /dev/null +++ b/examples/ametrics.rs @@ -0,0 +1,71 @@ +use anyhow::Result; +use concurrency::AmapMetrics; +use rand::Rng; +use std::{ + thread, + time::{Duration, Instant}, +}; + +const N: usize = 2; +const M: usize = 4; + +fn main() -> Result<()> { + let metrics = AmapMetrics::new(&[ + "call.thread.worker.0", + "call.thread.worker.1", + "req.page.1", + "req.page.2", + "req.page.3", + "req.page.4", + ]); + let start_time = Instant::now(); + + // start N workers and M requesters + + for idx in 0..N { + task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)} + } + + for _ in 0..M { + request_worker(metrics.clone())?; + } + + while start_time.elapsed() < Duration::from_secs(10) { + thread::sleep(Duration::from_secs(2)); + println!("{}", metrics); + } + + Ok(()) +} + +fn task_worker(idx: usize, metrics: AmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + // do long term stuff + let mut rng = rand::thread_rng(); + + thread::sleep(Duration::from_millis(rng.gen_range(100..5000))); + metrics.inc(format!("call.thread.worker.{}", idx))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + Ok(()) +} + +fn request_worker(metrics: AmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + // process requests + let mut rng = rand::thread_rng(); + + thread::sleep(Duration::from_millis(rng.gen_range(50..800))); + let page = rng.gen_range(1..5); + metrics.inc(format!("req.page.{}", page))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + + Ok(()) +} diff --git a/examples/metrics.rs b/examples/cmetrics.rs similarity index 92% rename from examples/metrics.rs rename to examples/cmetrics.rs index ff58374..595d1a7 100644 --- a/examples/metrics.rs +++ b/examples/cmetrics.rs @@ -4,7 +4,7 @@ use std::{ }; use anyhow::Result; -use concurrency::Metrics; +use concurrency::CmapMetrics; use rand::Rng; const N: usize = 2; @@ -52,7 +52,7 @@ const M: usize = 4; // region: --- DashMap version fn main() -> Result<()> { - let metrics = Metrics::new(); + let metrics = CmapMetrics::new(); let start_time = Instant::now(); println!("{}", metrics); @@ -70,7 +70,7 @@ fn main() -> Result<()> { } // endregion: --- DashMap version -fn task_worker(idx: usize, metrics: Metrics) -> Result<()> { +fn task_worker(idx: usize, metrics: CmapMetrics) -> Result<()> { thread::spawn(move || { loop { let mut rng = rand::thread_rng(); @@ -84,7 +84,7 @@ fn task_worker(idx: usize, metrics: Metrics) -> Result<()> { Ok(()) } -fn request_worker(metrics: Metrics) -> Result<()> { +fn request_worker(metrics: CmapMetrics) -> Result<()> { thread::spawn(move || { loop { let mut rng = rand::thread_rng(); diff --git a/src/metrics.rs b/src/metrics.rs index 0fe64b0..a563b17 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,119 +1,5 @@ -// metrics data structure -// basic functions: inc/dec/snapshot +mod cmap; +mod amap; -use std::{fmt::Display, sync::Arc}; - -use anyhow::Result; -use dashmap::DashMap; - -// region: --- HashMap Version -// // metrics table -// #[derive(Debug, Default, Clone)] -// pub struct Metrics { -// // data: HashMap, -// // data: Arc>>, -// data: Arc>>, -// } - -// // region: --- impls -// impl Metrics { -// pub fn new() -> Self { -// // Self { -// // data: HashMap::new(), -// // } -// Self::default() -// } -// // pub fn inc(&mut self, key: &str) { -// // let counter = self.data.entry(key.to_string()).or_insert(0); -// // *counter += 1; -// // } - -// // pub fn dec(&mut self, key: &str) { -// // let counter = self.data.entry(key.to_string()).or_insert(0); -// // *counter -= 1; -// // } - -// pub fn inc(&self, key: impl Into) -> Result<()> { -// // let counter = self.data.lock.entry(key.into()).or_insert(0); -// // *counter += 1; -// // let mut data = self.data.lock().unwrap(); -// // let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; -// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; -// let counter = data.entry(key.into()).or_insert(0); -// *counter += 1; -// Ok(()) -// } - -// pub fn dec(&self, key: impl Into) -> Result<()> { -// // let counter = self.data.entry(key.into()).or_insert(0); -// // *counter -= 1; -// // let mut data = self.data.lock().unwrap(); -// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; -// let counter = data.entry(key.into()).or_insert(0); -// *counter -= 1; -// Ok(()) -// } - -// pub fn snapshot(&self) -> Result> { -// // self.data.clone() -// // Ok(self -// // .data -// // .lock() -// // .map_err(|e| anyhow!(e.to_string()))? -// // .clone()) -// Ok(self -// .data -// .read() -// .map_err(|e| anyhow!(e.to_string()))? -// .clone()) -// } -// } - -// impl Display for Metrics { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// let data = self.data.read().map_err(|_e| fmt::Error {})?; -// for (key, value) in data.iter() { -// writeln!(f, "{}: {}", key, value)?; -// } -// Ok(()) -// } -// } -// // endregion: --- impls -// endregion: --- HashMap Version - -// region: --- DashMap Version -#[derive(Default, Clone)] -pub struct Metrics { - // Arc>> => Arc> - data: Arc>, // 不需要加锁, 因为 DashMap 本身是线程安全的 -} - -// region: --- impls -impl Metrics { - pub fn new() -> Self { - Self::default() - } - - pub fn inc(&self, key: impl Into) -> Result<()> { - let mut counter = self.data.entry(key.into()).or_insert(0); - *counter += 1; - Ok(()) - } - - pub fn dec(&self, key: impl Into) -> Result<()> { - let mut counter = self.data.entry(key.into()).or_insert(0); - *counter -= 1; - Ok(()) - } -} - -impl Display for Metrics { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for entry in self.data.iter() { - writeln!(f, "{}: {}", entry.key(), entry.value())?; - } - Ok(()) - } -} -// endregion: --- impls -// endregion: --- DashMap Version +pub use cmap::*; +pub use amap::*; \ No newline at end of file diff --git a/src/metrics/amap.rs b/src/metrics/amap.rs new file mode 100644 index 0000000..278e366 --- /dev/null +++ b/src/metrics/amap.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use std::{ + collections::HashMap, + fmt, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, +}; + +// Rust 标准库提供了一些原子类型,可以在多线程环境下安全地共享和修改数据 +// 不需要使用锁,原子类型的操作是无锁的,因此性能更好 +#[derive(Debug, Clone)] +pub struct AmapMetrics { + data: Arc>, +} + +impl AmapMetrics { + pub fn new(metric_names: &[&'static str]) -> Self { + // 初始化 HashMap,每个 key 对应一个 AtomicI64 类型的值 + let map = metric_names + .iter() + .map(|&name| (name, AtomicI64::new(0))) + .collect(); + AmapMetrics { + data: Arc::new(map), + } + } + + pub fn inc(&self, key: impl AsRef) -> Result<()> { + let key = key.as_ref(); + let counter = self + .data + .get(key) + .ok_or_else(|| anyhow::anyhow!("key {} not found", key))?; + counter.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + + pub fn dec(&self, key: impl AsRef) -> Result<()> { + let key = key.as_ref(); + let counter = self + .data + .get(key) + .ok_or_else(|| anyhow::anyhow!("key {} not found", key))?; + counter.fetch_sub(1, Ordering::Relaxed); + Ok(()) + } +} + +// impl Clone for AmapMetrics { +// fn clone(&self) -> Self { +// AmapMetrics { +// data: Arc::clone(&self.data), +// } +// } +// } + +impl fmt::Display for AmapMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for (key, value) in self.data.iter() { + writeln!(f, "{}: {}", key, value.load(Ordering::Relaxed))?; + } + Ok(()) + } +} diff --git a/src/metrics/cmap.rs b/src/metrics/cmap.rs new file mode 100644 index 0000000..3d8bedc --- /dev/null +++ b/src/metrics/cmap.rs @@ -0,0 +1,119 @@ +// metrics data structure +// basic functions: inc/dec/snapshot + +use std::{fmt::Display, sync::Arc}; + +use anyhow::Result; +use dashmap::DashMap; + +// region: --- HashMap Version +// // metrics table +// #[derive(Debug, Default, Clone)] +// pub struct Metrics { +// // data: HashMap, +// // data: Arc>>, +// data: Arc>>, +// } + +// // region: --- impls +// impl Metrics { +// pub fn new() -> Self { +// // Self { +// // data: HashMap::new(), +// // } +// Self::default() +// } +// // pub fn inc(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter += 1; +// // } + +// // pub fn dec(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter -= 1; +// // } + +// pub fn inc(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.lock.entry(key.into()).or_insert(0); +// // *counter += 1; +// // let mut data = self.data.lock().unwrap(); +// // let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter += 1; +// Ok(()) +// } + +// pub fn dec(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.entry(key.into()).or_insert(0); +// // *counter -= 1; +// // let mut data = self.data.lock().unwrap(); +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter -= 1; +// Ok(()) +// } + +// pub fn snapshot(&self) -> Result> { +// // self.data.clone() +// // Ok(self +// // .data +// // .lock() +// // .map_err(|e| anyhow!(e.to_string()))? +// // .clone()) +// Ok(self +// .data +// .read() +// .map_err(|e| anyhow!(e.to_string()))? +// .clone()) +// } +// } + +// impl Display for Metrics { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// let data = self.data.read().map_err(|_e| fmt::Error {})?; +// for (key, value) in data.iter() { +// writeln!(f, "{}: {}", key, value)?; +// } +// Ok(()) +// } +// } +// // endregion: --- impls +// endregion: --- HashMap Version + +// region: --- DashMap Version +#[derive(Default, Clone)] +pub struct CmapMetrics { + // Arc>> => Arc> + data: Arc>, // 不需要加锁, 因为 DashMap 本身是线程安全的 +} + +// region: --- impls +impl CmapMetrics { + pub fn new() -> Self { + Self::default() + } + + pub fn inc(&self, key: impl Into) -> Result<()> { + let mut counter = self.data.entry(key.into()).or_insert(0); + *counter += 1; + Ok(()) + } + + pub fn dec(&self, key: impl Into) -> Result<()> { + let mut counter = self.data.entry(key.into()).or_insert(0); + *counter -= 1; + Ok(()) + } +} + +impl Display for CmapMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for entry in self.data.iter() { + writeln!(f, "{}: {}", entry.key(), entry.value())?; + } + Ok(()) + } +} +// endregion: --- impls +// endregion: --- DashMap Version