Skip to content

Commit

Permalink
feat(amap): support atomic map for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
upupnoah committed Jul 10, 2024
1 parent 0d124cf commit 240cdea
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 122 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,14 @@
# rust-concurrency

## matrix

- 矩阵乘法
- 并发矩阵乘法

## cmap metrics(指标监测)

| 并发 map

## amap metrics(指标监测)

| atomic map
71 changes: 71 additions & 0 deletions examples/ametrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use anyhow::Result;
use concurrency::AmapMetrics;
use rand::Rng;
use std::{
thread,
time::{Duration, Instant},
};

const N: usize = 2;
const M: usize = 4;

fn main() -> Result<()> {
let metrics = AmapMetrics::new(&[
"call.thread.worker.0",
"call.thread.worker.1",
"req.page.1",
"req.page.2",
"req.page.3",
"req.page.4",
]);
let start_time = Instant::now();

// start N workers and M requesters

for idx in 0..N {
task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)}
}

for _ in 0..M {
request_worker(metrics.clone())?;
}

while start_time.elapsed() < Duration::from_secs(10) {
thread::sleep(Duration::from_secs(2));
println!("{}", metrics);
}

Ok(())
}

fn task_worker(idx: usize, metrics: AmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
// do long term stuff
let mut rng = rand::thread_rng();

thread::sleep(Duration::from_millis(rng.gen_range(100..5000)));
metrics.inc(format!("call.thread.worker.{}", idx))?;
}
#[allow(unreachable_code)]
Ok::<_, anyhow::Error>(())
});
Ok(())
}

fn request_worker(metrics: AmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
// process requests
let mut rng = rand::thread_rng();

thread::sleep(Duration::from_millis(rng.gen_range(50..800)));
let page = rng.gen_range(1..5);
metrics.inc(format!("req.page.{}", page))?;
}
#[allow(unreachable_code)]
Ok::<_, anyhow::Error>(())
});

Ok(())
}
8 changes: 4 additions & 4 deletions examples/metrics.rs → examples/cmetrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use anyhow::Result;
use concurrency::Metrics;
use concurrency::CmapMetrics;
use rand::Rng;

const N: usize = 2;
Expand Down Expand Up @@ -52,7 +52,7 @@ const M: usize = 4;

// region: --- DashMap version
fn main() -> Result<()> {
let metrics = Metrics::new();
let metrics = CmapMetrics::new();
let start_time = Instant::now();
println!("{}", metrics);

Expand All @@ -70,7 +70,7 @@ fn main() -> Result<()> {
}
// endregion: --- DashMap version

fn task_worker(idx: usize, metrics: Metrics) -> Result<()> {
fn task_worker(idx: usize, metrics: CmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
let mut rng = rand::thread_rng();
Expand All @@ -84,7 +84,7 @@ fn task_worker(idx: usize, metrics: Metrics) -> Result<()> {
Ok(())
}

fn request_worker(metrics: Metrics) -> Result<()> {
fn request_worker(metrics: CmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
let mut rng = rand::thread_rng();
Expand Down
122 changes: 4 additions & 118 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,119 +1,5 @@
// metrics data structure
// basic functions: inc/dec/snapshot
mod cmap;
mod amap;

use std::{fmt::Display, sync::Arc};

use anyhow::Result;
use dashmap::DashMap;

// region: --- HashMap Version
// // metrics table
// #[derive(Debug, Default, Clone)]
// pub struct Metrics {
// // data: HashMap<String, i64>,
// // data: Arc<Mutex<HashMap<String, i64>>>,
// data: Arc<RwLock<HashMap<String, i64>>>,
// }

// // region: --- impls
// impl Metrics {
// pub fn new() -> Self {
// // Self {
// // data: HashMap::new(),
// // }
// Self::default()
// }
// // pub fn inc(&mut self, key: &str) {
// // let counter = self.data.entry(key.to_string()).or_insert(0);
// // *counter += 1;
// // }

// // pub fn dec(&mut self, key: &str) {
// // let counter = self.data.entry(key.to_string()).or_insert(0);
// // *counter -= 1;
// // }

// pub fn inc(&self, key: impl Into<String>) -> Result<()> {
// // let counter = self.data.lock.entry(key.into()).or_insert(0);
// // *counter += 1;
// // let mut data = self.data.lock().unwrap();
// // let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?;
// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?;
// let counter = data.entry(key.into()).or_insert(0);
// *counter += 1;
// Ok(())
// }

// pub fn dec(&self, key: impl Into<String>) -> Result<()> {
// // let counter = self.data.entry(key.into()).or_insert(0);
// // *counter -= 1;
// // let mut data = self.data.lock().unwrap();
// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?;
// let counter = data.entry(key.into()).or_insert(0);
// *counter -= 1;
// Ok(())
// }

// pub fn snapshot(&self) -> Result<HashMap<String, i64>> {
// // self.data.clone()
// // Ok(self
// // .data
// // .lock()
// // .map_err(|e| anyhow!(e.to_string()))?
// // .clone())
// Ok(self
// .data
// .read()
// .map_err(|e| anyhow!(e.to_string()))?
// .clone())
// }
// }

// impl Display for Metrics {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// let data = self.data.read().map_err(|_e| fmt::Error {})?;
// for (key, value) in data.iter() {
// writeln!(f, "{}: {}", key, value)?;
// }
// Ok(())
// }
// }
// // endregion: --- impls
// endregion: --- HashMap Version

// region: --- DashMap Version
#[derive(Default, Clone)]
pub struct Metrics {
// Arc<Mutex<HashMap<String, i64>>> => Arc<DashMap<String, i64>>
data: Arc<DashMap<String, i64>>, // 不需要加锁, 因为 DashMap 本身是线程安全的
}

// region: --- impls
impl Metrics {
pub fn new() -> Self {
Self::default()
}

pub fn inc(&self, key: impl Into<String>) -> Result<()> {
let mut counter = self.data.entry(key.into()).or_insert(0);
*counter += 1;
Ok(())
}

pub fn dec(&self, key: impl Into<String>) -> Result<()> {
let mut counter = self.data.entry(key.into()).or_insert(0);
*counter -= 1;
Ok(())
}
}

impl Display for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for entry in self.data.iter() {
writeln!(f, "{}: {}", entry.key(), entry.value())?;
}
Ok(())
}
}
// endregion: --- impls
// endregion: --- DashMap Version
pub use cmap::*;
pub use amap::*;
66 changes: 66 additions & 0 deletions src/metrics/amap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use anyhow::Result;
use std::{
collections::HashMap,
fmt,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
};

// Rust 标准库提供了一些原子类型,可以在多线程环境下安全地共享和修改数据
// 不需要使用锁,原子类型的操作是无锁的,因此性能更好
#[derive(Debug, Clone)]
pub struct AmapMetrics {
data: Arc<HashMap<&'static str, AtomicI64>>,
}

impl AmapMetrics {
pub fn new(metric_names: &[&'static str]) -> Self {
// 初始化 HashMap,每个 key 对应一个 AtomicI64 类型的值
let map = metric_names
.iter()
.map(|&name| (name, AtomicI64::new(0)))
.collect();
AmapMetrics {
data: Arc::new(map),
}
}

pub fn inc(&self, key: impl AsRef<str>) -> Result<()> {
let key = key.as_ref();
let counter = self
.data
.get(key)
.ok_or_else(|| anyhow::anyhow!("key {} not found", key))?;
counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}

pub fn dec(&self, key: impl AsRef<str>) -> Result<()> {
let key = key.as_ref();
let counter = self
.data
.get(key)
.ok_or_else(|| anyhow::anyhow!("key {} not found", key))?;
counter.fetch_sub(1, Ordering::Relaxed);
Ok(())
}
}

// impl Clone for AmapMetrics {
// fn clone(&self) -> Self {
// AmapMetrics {
// data: Arc::clone(&self.data),
// }
// }
// }

impl fmt::Display for AmapMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (key, value) in self.data.iter() {
writeln!(f, "{}: {}", key, value.load(Ordering::Relaxed))?;
}
Ok(())
}
}
Loading

0 comments on commit 240cdea

Please sign in to comment.