Skip to content

Commit

Permalink
monitoring-client: add to-disk logging of Bitswap messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mrd0ll4r committed May 8, 2023
1 parent 372a5d9 commit a7114b6
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 7 deletions.
19 changes: 17 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Dockerfile.bitswap-monitoring-client
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ COPY --from=ipfs-tools-builder /ipfs-tools/target/release/bitswap-monitoring-cli
COPY --from=ipfs-tools-builder /ipfs-tools/bitswap-monitoring-client/config.yaml ./config/bitswap-monitoring-client-config.yaml

# Set ownership.
RUN chown -R ipfs:ipfs ./bitswap-monitoring-client
RUN chown -R ipfs:ipfs ./

# Set log level.
ENV RUST_LOG=info
Expand All @@ -30,7 +30,7 @@ ENV RUST_LOG=info
EXPOSE 8088

# Drop root.
USER ipfs
#USER ipfs

# Run the binary.
ENTRYPOINT ["./bitswap-monitoring-client","--config","./config/bitswap-monitoring-client-config.yaml"]
6 changes: 5 additions & 1 deletion bitswap-monitoring-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
ipfs-resolver-common = { path = "../common" }
ipfs_monitoring_plugin_client = { path = "../ipfs-monitoring-plugin-client" }
tokio = { version = "1", features = ["rt", "net", "sync", "rt-multi-thread", "time", "macros", "signal", "fs"] }
tokio = { version = "1", features = ["rt", "net", "sync", "rt-multi-thread", "time", "macros", "signal", "fs", "io-util"] }
log = "0.4.14"
flexi_logger = "0.25.3"
failure = "0.1.8"
Expand All @@ -29,3 +29,7 @@ multiaddr = "0.17"

# ISO 3166-1 countries.
celes = "2.4.0"

# Logging to file
async-compression = { version = "0.3.15" , default-features = false, features=["tokio","gzip"]}
serde_json = "1.0.96"
5 changes: 5 additions & 0 deletions bitswap-monitoring-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ prometheus_address: "0.0.0.0:8080"
# Defaults to empty, i.e., no tagging of gateway traffic.
#gateway_file_path: "/usr/local/share/gateways.txt"

# Specifies a path to a directory to write bitswap traces to.
# A subdirectory per monitor will be created.
# If not provided, logging to disk will be disabled.
#disk_logging_directory: "traces"

# List of AMQP data sources to connect to.
amqp_servers:
# Address of the AMQP server, using amqp or amqps (TLS transport) scheme.
Expand Down
5 changes: 5 additions & 0 deletions bitswap-monitoring-client/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ prometheus_address: "0.0.0.0:8088"
# Defaults to empty, i.e., no tagging of gateway traffic.
#gateway_file_path: "/usr/local/share/gateways.txt"

# Specifies a path to a directory to write bitswap traces to.
# A subdirectory per monitor will be created.
# If not provided, logging to disk will be disabled.
#disk_logging_directory: "traces"

# List of AMQP data sources to connect to.
amqp_servers:
# Address of the AMQP server, using amqp or amqps (TLS transport) scheme.
Expand Down
5 changes: 5 additions & 0 deletions bitswap-monitoring-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub(crate) struct Config {
/// Each line in the file should contain one peer ID.
/// If not provided, all traffic will be logged as non-gateway traffic.
pub(crate) gateway_file_path: Option<String>,

/// Specifies a path to a directory to write bitswap traces to.
/// A subdirectory per monitor will be created.
/// If not provided, logging to disk will be disabled.
pub(crate) disk_logging_directory: Option<String>,
}

/// Configuration for a single data source.
Expand Down
206 changes: 206 additions & 0 deletions bitswap-monitoring-client/src/disklog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use async_compression::tokio::write::GzipEncoder;
use failure::{Error, Fail, ResultExt};
use ipfs_monitoring_plugin_client::monitoring::PushedEvent;
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::select;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};

use crate::Result;

/// An async to-disk logger for Bitswap messages.
/// Messages are processed concurrently, pipelined.
/// Output is written to a separate subdirectory per monitor, to gzipped JSON files.
/// The output file is rotated regularly.
#[derive(Clone, Debug)]
pub(crate) struct ToDiskLogger {
output: Sender<PushedEvent>,
last_error: Arc<Mutex<Option<Error>>>,
}

impl ToDiskLogger {
/// Creates a new to-disk logger for Bitswap messages.
/// A subdirectory with the monitor name will be created under the provided path,
/// if it does not exist already.
pub(crate) async fn new_for_monitor(base_directory: &str, monitor_name: &str) -> Result<Self> {
// Create subdirectory for monitor, if not exists.
let base_dir = PathBuf::from_str(base_directory).context("invalid path")?;
let target_dir = base_dir.join(monitor_name);
fs::create_dir_all(&target_dir).context("unable to create logging directory")?;

// Create initial output file
let out_file = Self::rotate_file(None, &target_dir)
.await
.context("unable to create output file")?;

// Set up some plumbing
let (send_msg, recv_msg) = mpsc::channel(1);
let (send_json, recv_json) = mpsc::channel(1);

let logger = ToDiskLogger {
output: send_msg,
last_error: Default::default(),
};

// Spawn tasks
tokio::spawn(Self::json_encode_task(
monitor_name.to_string(),
recv_msg,
send_json,
logger.last_error.clone(),
));
tokio::spawn(Self::write_to_file(
monitor_name.to_string(),
target_dir.clone(),
logger.last_error.clone(),
recv_json,
out_file,
));

Ok(logger)
}

/// Logs the given message to file.
/// An error is returned if logging of previous messages failed.
/// After an error has been returned, subsequent calls will panic.
pub(crate) async fn log_message(&self, msg: PushedEvent) -> Result<()> {
if let Err(_) = self.output.send(msg).await {
// Receiver closed, this is an error.
if let Some(err) = self.last_error.lock().await.take() {
return Err(err);
}
unreachable!()
}

Ok(())
}

async fn json_encode_task(
monitor_name: String,
mut input: Receiver<PushedEvent>,
output: Sender<Vec<u8>>,
error_storage: Arc<Mutex<Option<Error>>>,
) {
while let Some(msg) = input.recv().await {
let serialized = match serde_json::to_vec(&msg) {
Ok(s) => s,
Err(e) => {
let mut last_err = error_storage.lock().await;
*last_err = Some(e.context("unable to serialize to JSON").into());
break;
}
};
if let Err(_) = output.send(serialized).await {
// Receiver closed, this is an error.
// However, the receiver is responsible for storing that error, so we just quit.
debug!(
"{} json encode: unable to send, receiver closed",
monitor_name
);
break;
}
}

debug!("{} json encode: exiting", monitor_name);
}

async fn finalize_file(mut f: BufWriter<GzipEncoder<File>>) -> Result<()> {
f.flush().await.context("unable to flush buffer")?;
f.shutdown().await.context("unable to write trailer")?;
Ok(())
}

async fn rotate_file(
old_file: Option<BufWriter<GzipEncoder<File>>>,
target_dir: &PathBuf,
) -> Result<BufWriter<GzipEncoder<File>>> {
if let Some(f) = old_file {
Self::finalize_file(f)
.await
.context("unable to finalize previous file")?;
}

let ts = format!("{}", chrono::Utc::now().format("%Y-%m-%d_%H-%M-%S_%Z"));
let fp = target_dir.join(format!("{}.json.gz", ts));
debug!("creating new log file at {:?}", fp);
let out_file = File::create(fp)
.await
.context("unable to bitswap logging file")?;

Ok(BufWriter::new(GzipEncoder::new(out_file)))
}

async fn write_to_file(
monitor_name: String,
target_dir: PathBuf,
error_storage: Arc<Mutex<Option<Error>>>,
mut input: Receiver<Vec<u8>>,
file: BufWriter<GzipEncoder<File>>,
) {
let mut current_file = file;
let newline = "\n".as_bytes();

// Create a ticker to rotate the output file every hour.
let mut rotation_ticker = tokio::time::interval(Duration::from_secs(60 * 60));
// First tick comes for free.
rotation_ticker.tick().await;

loop {
select! {
msg = input.recv() => {
match msg {
None => {
// Sender closed, we're shutting down (or something went wrong).
debug!("{} write: sender closed, exiting",monitor_name);
// Don't forget to flush and finalize.
if let Err(e) = Self::finalize_file(current_file).await {
error!("{} write: unable to finalize output file: {:?}",monitor_name,e);
let mut last_err = error_storage.lock().await;
*last_err = Some(e.context("unable to finalize output file").into());
}
break
}
Some(msg) => {
// Write to file
let res = current_file.write_all(&msg).await;
if let Err(e) = res {
error!("{} write: unable to write: {:?}",monitor_name,e);
let mut last_err = error_storage.lock().await;
*last_err = Some(e.context("unable to write").into());
break
}
let res = current_file.write_all(newline).await;
if let Err(e) = res {
error!("{} write: unable to write: {:?}",monitor_name,e);
let mut last_err = error_storage.lock().await;
*last_err = Some(e.context("unable to write").into());
break
}
}
}
},
_ = rotation_ticker.tick() => {
// Rotate the file
debug!("{} write: rotating output file",monitor_name);
current_file = match Self::rotate_file(Some(current_file),&target_dir).await {
Ok(f) => f,
Err(e) => {
error!("{} write: unable to rotate bitswap log file: {:?}",monitor_name,e);
let mut last_err = error_storage.lock().await;
*last_err = Some(e.context("unable to rotate output file").into());
break
}
}
}
}
}

debug!("{} write: exiting", monitor_name);
}
}
Loading

0 comments on commit a7114b6

Please sign in to comment.