Skip to content

Commit

Permalink
Merge pull request #5 from upupnoah/main
Browse files Browse the repository at this point in the history
feat: support cmap and amap
  • Loading branch information
upupnoah authored Jul 10, 2024
2 parents f2f0af9 + 240cdea commit acbdf44
Show file tree
Hide file tree
Showing 11 changed files with 643 additions and 0 deletions.
153 changes: 153 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ authors = ["Noah <[email protected]>"]

[dependencies]
anyhow = "^1.0"
dashmap = "6.0.1"
oneshot = "0.1.8"
rand = "^0.8.5"
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,14 @@
# rust-concurrency

## matrix

- 矩阵乘法
- 并发矩阵乘法

## cmap metrics(指标监测)

| 并发 map

## amap metrics(指标监测)

| atomic map
71 changes: 71 additions & 0 deletions examples/ametrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use anyhow::Result;
use concurrency::AmapMetrics;
use rand::Rng;
use std::{
thread,
time::{Duration, Instant},
};

const N: usize = 2;
const M: usize = 4;

fn main() -> Result<()> {
let metrics = AmapMetrics::new(&[
"call.thread.worker.0",
"call.thread.worker.1",
"req.page.1",
"req.page.2",
"req.page.3",
"req.page.4",
]);
let start_time = Instant::now();

// start N workers and M requesters

for idx in 0..N {
task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)}
}

for _ in 0..M {
request_worker(metrics.clone())?;
}

while start_time.elapsed() < Duration::from_secs(10) {
thread::sleep(Duration::from_secs(2));
println!("{}", metrics);
}

Ok(())
}

fn task_worker(idx: usize, metrics: AmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
// do long term stuff
let mut rng = rand::thread_rng();

thread::sleep(Duration::from_millis(rng.gen_range(100..5000)));
metrics.inc(format!("call.thread.worker.{}", idx))?;
}
#[allow(unreachable_code)]
Ok::<_, anyhow::Error>(())
});
Ok(())
}

fn request_worker(metrics: AmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
// process requests
let mut rng = rand::thread_rng();

thread::sleep(Duration::from_millis(rng.gen_range(50..800)));
let page = rng.gen_range(1..5);
metrics.inc(format!("req.page.{}", page))?;
}
#[allow(unreachable_code)]
Ok::<_, anyhow::Error>(())
});

Ok(())
}
99 changes: 99 additions & 0 deletions examples/cmetrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{
thread,
time::{Duration, Instant},
};

use anyhow::Result;
use concurrency::CmapMetrics;
use rand::Rng;

const N: usize = 2;
const M: usize = 4;
// region: --- HashMap version
// fn main() -> Result<()> {
// let metrics = Metrics::new();
// let start_time = Instant::now();

// // region: --- 单线程可用 code

// // for i in 0..100 {
// // metrics.inc("req.page.1");
// // metrics.inc("req.page.2");
// // if i & 1 == 0 {
// // metrics.inc("req.page.3");
// // }
// // }

// // for _ in 0..27 {
// // metrics.inc("call.thread.worker.1");
// // }

// // endregion: --- 单线程可用 code

// println!("{:?}", metrics.snapshot()?);

// for idx in 0..N {
// task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)}
// }

// for _ in 0..M {
// request_worker(metrics.clone())?;
// }

// while start_time.elapsed() < Duration::from_secs(10) {
// thread::sleep(Duration::from_secs(2));
// // println!("{:?}", metrics.snapshot()?); // snapshot 使用 clone 的方式
// println!("{}", metrics); // 拿到读锁之后, 直接打印
// }

// Ok(())
// }
// endregion: --- HashMap version

// region: --- DashMap version
fn main() -> Result<()> {
let metrics = CmapMetrics::new();
let start_time = Instant::now();
println!("{}", metrics);

for idx in 0..N {
task_worker(idx, metrics.clone())?;
}
for _ in 0..M {
request_worker(metrics.clone())?;
}
while start_time.elapsed() < Duration::from_secs(10) {
thread::sleep(Duration::from_secs(2));
println!("{}", metrics);
}
Ok(())
}
// endregion: --- DashMap version

fn task_worker(idx: usize, metrics: CmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
let mut rng = rand::thread_rng();
thread::sleep(Duration::from_millis(rng.gen_range(100..5000)));
// metrics.inc(format!("call.thread.worker.{}", idx)).unwrap();
metrics.inc(format!("call.thread.worker.{}", idx))?;
}
#[allow(unreachable_code)]
Ok::<_, anyhow::Error>(())
});
Ok(())
}

fn request_worker(metrics: CmapMetrics) -> Result<()> {
thread::spawn(move || {
loop {
let mut rng = rand::thread_rng();
thread::sleep(Duration::from_millis(rng.gen_range(50..800)));
let page = rng.gen_range(1..=256);
metrics.inc(format!("req.page.{}", page))?;
}
#[allow(unreachable_code)]
Ok::<_, anyhow::Error>(())
});
Ok(())
}
Loading

0 comments on commit acbdf44

Please sign in to comment.