diff --git a/Cargo.lock b/Cargo.lock index 9054d10..0684b14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,6 +127,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compression" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-executor" version = "1.5.0" @@ -332,11 +345,13 @@ dependencies = [ name = "bitswap-monitoring-client" version = "0.6.0" dependencies = [ + "async-compression", "celes", "chrono", "clap", "failure", "flexi_logger", + "futures", "futures-util", "ipfs-resolver-common", "ipfs_monitoring_plugin_client", @@ -347,8 +362,10 @@ dependencies = [ "prometheus", "prometheus_exporter", "serde", + "serde_json", "serde_yaml", "tokio", + "tokio-util", ] [[package]] @@ -933,9 +950,9 @@ checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" [[package]] name = "futures" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -948,9 +965,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -958,15 +975,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -975,9 +992,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-lite" @@ -996,9 +1013,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -1007,21 +1024,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", diff --git a/Dockerfile.bitswap-monitoring-client b/Dockerfile.bitswap-monitoring-client index a077677..6edebf5 100644 --- a/Dockerfile.bitswap-monitoring-client +++ b/Dockerfile.bitswap-monitoring-client @@ -1,27 +1,58 @@ # Implements an image to run the bitswap-monitoring-client tool. # This will expose port 8088 for prometheus. -# The executable is placed in /, the config in /config/. +# The executable is placed in /ipfs-tools, the config in /ipfs-tools/config/. # The config is copied from the builder stage (and thus verbose from the sources). -# You can probably overwrite it by mounting your own config directory, I guess. +# You can override it by mounting your own. + +# First build su-exec +FROM ubuntu:jammy AS builder + +RUN apt-get update && apt-get install -y \ + curl \ + build-essential \ + git \ + wget + +# Get su-exec, a very minimal tool for dropping privileges. +ENV SUEXEC_VERSION=v0.2 +RUN set -eux; \ + dpkgArch="$(dpkg --print-architecture)"; \ + case "${dpkgArch##*-}" in \ + "amd64" | "armhf" | "arm64") tiniArch="tini-static-$dpkgArch" ;;\ + *) echo >&2 "unsupported architecture: ${dpkgArch}"; exit 1 ;; \ + esac; \ + cd /tmp \ + && git clone https://github.com/ncopa/su-exec.git \ + && cd su-exec \ + && git checkout -q $SUEXEC_VERSION \ + && make su-exec-static + +# Get yq +ENV YQ_VERSION=v4.44.3 +RUN set -eux; \ + dpkgArch="$(dpkg --print-architecture)"; \ + case "${dpkgArch##*-}" in \ + "amd64" | "arm" | "armhf" | "arm64") tiniArch="tini-static-$dpkgArch" ;;\ + *) echo >&2 "unsupported architecture: ${dpkgArch}"; exit 1 ;; \ + esac; \ + wget https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/yq_linux_${dpkgArch} -O /usr/bin/yq &&\ + chmod +x /usr/bin/yq # Get some small base image to run things on. FROM ubuntu:jammy AS runtime -# Create a system user to drop into. -# This will get some small (<1000) UID and GID, which is fine since we don't write to any files on the host. -RUN groupadd -r ipfs \ - && useradd --no-log-init -r -g ipfs ipfs \ - && mkdir -p ipfs - # Enter our working directory. WORKDIR ipfs-tools # Copy compiled binaries from builder. COPY --from=ipfs-tools-builder /ipfs-tools/target/release/bitswap-monitoring-client . COPY --from=ipfs-tools-builder /ipfs-tools/bitswap-monitoring-client/config.yaml ./config/bitswap-monitoring-client-config.yaml +COPY --from=ipfs-tools-builder /ipfs-tools/bitswap-monitoring-client/docker-entrypoint.sh . +COPY --from=0 /tmp/su-exec/su-exec-static /sbin/su-exec +COPY --from=0 /usr/bin/yq /usr/bin/yq -# Set ownership. -RUN chown -R ipfs:ipfs ./bitswap-monitoring-client +# Make sure our entrypoint is executable. +RUN chmod 755 ./docker-entrypoint.sh # Set log level. ENV RUST_LOG=info @@ -29,8 +60,6 @@ ENV RUST_LOG=info # Expose Prometheus endpoint. EXPOSE 8088 -# Drop root. -USER ipfs - -# Run the binary. -ENTRYPOINT ["./bitswap-monitoring-client","--config","./config/bitswap-monitoring-client-config.yaml"] \ No newline at end of file +# Run the script. +# This will fix permissions on the temporary file storage directory, drop root, and then run the binary. +ENTRYPOINT ["./docker-entrypoint.sh"] diff --git a/bitswap-monitoring-client/Cargo.toml b/bitswap-monitoring-client/Cargo.toml index 96a7c77..d7ca3df 100644 --- a/bitswap-monitoring-client/Cargo.toml +++ b/bitswap-monitoring-client/Cargo.toml @@ -7,11 +7,13 @@ edition = "2021" [dependencies] ipfs-resolver-common = { path = "../common" } ipfs_monitoring_plugin_client = { path = "../ipfs-monitoring-plugin-client" } -tokio = { version = "1", features = ["rt", "net", "sync", "rt-multi-thread", "time", "macros", "signal", "fs"] } +tokio = { version = "^1.21", features = ["rt", "net", "sync", "rt-multi-thread", "time", "macros", "signal", "fs", "io-util"] } log = "0.4.21" flexi_logger = "0.28.5" failure = "0.1.8" futures-util = "0.3.29" +tokio-util = "0.7.10" +futures = "0.3.30" chrono = { version = "0.4.31", features = ["serde"] } prometheus_exporter = "0.8.4" # This needs to be matching the version prometheus_exporter uses! @@ -29,3 +31,7 @@ multiaddr = "0.17" # ISO 3166-1 countries. celes = "2.4.0" + +# Logging to file +async-compression = { version = "0.3.15" , default-features = false, features=["tokio","gzip"]} +serde_json = "1.0.96" diff --git a/bitswap-monitoring-client/README.md b/bitswap-monitoring-client/README.md index 4c395a3..087af43 100644 --- a/bitswap-monitoring-client/README.md +++ b/bitswap-monitoring-client/README.md @@ -16,7 +16,7 @@ This is an example config file, see also the [file](./config.yaml) and the [impl # This is a config file for the bitswap-monitoring-client tool. # Address to listen and serve prometheus metrics on. -prometheus_address: "0.0.0.0:8080" +prometheus_address: "0.0.0.0:8088" # Specifies the path to the MaxMind GeoLite databases. # Defaults to /usr/local/share/GeoIP if unspecified. @@ -28,6 +28,11 @@ prometheus_address: "0.0.0.0:8080" # Defaults to empty, i.e., no tagging of gateway traffic. #gateway_file_path: "/usr/local/share/gateways.txt" +# Specifies a path to a directory to write JSON logs. +# A subdirectory per monitor will be created. +# If not provided, logging to disk will be disabled. +#disk_logging_directory: "traces" + # List of AMQP data sources to connect to. amqp_servers: # Address of the AMQP server, using amqp or amqps (TLS transport) scheme. @@ -40,6 +45,20 @@ amqp_servers: The `prometheus_address` specifies the local endpoint to listen and serve Prometheus metrics on. For each (`amqp_server`, `monitor_name`) combination, a connection to the AMQP server will be opened. +### Docker + +When running in docker via [../Dockerfile.bitswap-monitoring-client](../Dockerfile.bitswap-monitoring-client), +the client is started via the [docker-entrypoint.sh](docker-entrypoint.sh) script. +This looks for the environment variables `PUID` and `PGID`, `chown`s the logging directory, and drops root for the +given UID and GID. + +## To-Disk Logging + +If enabled via `disk_logging_directory`, the client writes logs as gzipped JSON files into the configured directory. +A subdirectory per monitor will be created. +Log files are rotated hourly. +The client listens for `SIGINT` and `SIGTERM` to shut down, and finalizes the currently-opened file. + ## Metrics Metrics are provided via a Prometheus HTTP endpoint. diff --git a/bitswap-monitoring-client/config.yaml b/bitswap-monitoring-client/config.yaml index fd36bea..47a6ad1 100644 --- a/bitswap-monitoring-client/config.yaml +++ b/bitswap-monitoring-client/config.yaml @@ -13,10 +13,15 @@ prometheus_address: "0.0.0.0:8088" # Defaults to empty, i.e., no tagging of gateway traffic. #gateway_file_path: "/usr/local/share/gateways.txt" +# Specifies a path to a directory to write JSON logs. +# A subdirectory per monitor will be created. +# If not provided, logging to disk will be disabled. +#disk_logging_directory: "traces" + # List of AMQP data sources to connect to. amqp_servers: # Address of the AMQP server, using amqp or amqps (TLS transport) scheme. - amqp_server_address: "amqp://localhost:5672/%2f" # A list of monitors to subscribe to via this data source. monitor_names: - - "local" + - "local" \ No newline at end of file diff --git a/bitswap-monitoring-client/docker-entrypoint.sh b/bitswap-monitoring-client/docker-entrypoint.sh new file mode 100644 index 0000000..1b8a23b --- /dev/null +++ b/bitswap-monitoring-client/docker-entrypoint.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +set -ex + +user_id=$PUID +user_gid=$PGID +if [ -z "$PUID" ]; then + echo "PUID unset, using default value of 1000" + user_id=1000 +fi +if [ -z "$PGID" ]; then + echo "PGID unset, using default value of 1000" + user_gid=1000 +fi + +traces_dir=$(yq '.disk_logging_directory' ./config/bitswap-monitoring-client-config.yaml) + +if [ "$(id -u)" -eq 0 ]; then + echo "Changing user to $user_id" + if [ ! "$traces_dir" == "null" ]; then + echo "Fixing permissions on logging directory $traces_dir..." + # ensure traces directory is writable + su-exec "$user_id" test -w "$traces_dir" || chown -R -- "$user_id:$user_gid" "$traces_dir" + fi + # restart script with new privileges + exec su-exec "$user_id:$user_gid" "$0" "$@" +fi + +# 2nd invocation with regular user +exec ./bitswap-monitoring-client "$@" \ No newline at end of file diff --git a/bitswap-monitoring-client/src/config.rs b/bitswap-monitoring-client/src/config.rs index 7d884b4..8f3716e 100644 --- a/bitswap-monitoring-client/src/config.rs +++ b/bitswap-monitoring-client/src/config.rs @@ -23,6 +23,11 @@ pub(crate) struct Config { /// Each line in the file should contain one peer ID. /// If not provided, all traffic will be logged as non-gateway traffic. pub(crate) gateway_file_path: Option, + + /// Specifies a path to a directory to write bitswap traces to. + /// A subdirectory per monitor will be created. + /// If not provided, logging to disk will be disabled. + pub(crate) disk_logging_directory: Option, } /// Configuration for a single data source. diff --git a/bitswap-monitoring-client/src/disklog.rs b/bitswap-monitoring-client/src/disklog.rs new file mode 100644 index 0000000..2be6888 --- /dev/null +++ b/bitswap-monitoring-client/src/disklog.rs @@ -0,0 +1,245 @@ +use crate::Result; +use async_compression::tokio::write::GzipEncoder; +use failure::{Error, Fail, ResultExt}; +use ipfs_monitoring_plugin_client::monitoring::PushedEvent; +use std::fs; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tokio::fs::File; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::select; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{mpsc, Mutex}; +use tokio::task::JoinHandle; + +/// An async to-disk logger for Bitswap messages. +/// Messages are processed concurrently, pipelined. +/// Output is written to a separate subdirectory per monitor, to gzipped JSON files. +/// The output file is rotated regularly. +#[derive(Debug)] +pub(crate) struct ToDiskLogger { + output: Sender, + last_error: Arc>>, + writer_handle: JoinHandle<()>, +} + +impl ToDiskLogger { + /// Creates a new to-disk logger for Bitswap messages. + /// A subdirectory with the monitor name will be created under the provided path, + /// if it does not exist already. + pub(crate) async fn new_for_monitor(base_directory: &str, monitor_name: &str) -> Result { + // Create subdirectory for monitor, if not exists. + let base_dir = PathBuf::from_str(base_directory).context("invalid path")?; + let target_dir = base_dir.join(monitor_name); + fs::create_dir_all(&target_dir).context("unable to create logging directory")?; + + // Create initial output file + let out_file = Self::rotate_file(None, &target_dir) + .await + .context("unable to create output file")?; + + // Set up some plumbing + let (send_msg, recv_msg) = mpsc::channel(1); + let (send_json, recv_json) = mpsc::channel(1); + let last_error = Arc::new(Mutex::new(None)); + + // Spawn tasks + tokio::spawn(Self::json_encode_task( + monitor_name.to_string(), + recv_msg, + send_json, + last_error.clone(), + )); + let writer_handle = tokio::spawn(Self::write_to_file( + monitor_name.to_string(), + target_dir.clone(), + last_error.clone(), + recv_json, + out_file, + )); + + let logger = ToDiskLogger { + output: send_msg, + last_error, + writer_handle, + }; + + Ok(logger) + } + + /// Logs the given message to file. + /// An error is returned if logging of previous messages failed. + /// After an error has been returned, subsequent calls will panic. + pub(crate) async fn log_message(&self, msg: PushedEvent) -> Result<()> { + if let Err(_) = self.output.send(msg).await { + // Receiver closed, this is an error. + if let Some(err) = self.last_error.lock().await.take() { + return Err(err); + } + unreachable!() + } + + Ok(()) + } + + async fn json_encode_task( + monitor_name: String, + mut input: Receiver, + output: Sender>, + error_storage: Arc>>, + ) { + while let Some(msg) = input.recv().await { + let serialized = match serde_json::to_vec(&msg) { + Ok(s) => s, + Err(e) => { + let mut last_err = error_storage.lock().await; + *last_err = Some(e.context("unable to serialize to JSON").into()); + break; + } + }; + if let Err(_) = output.send(serialized).await { + // Receiver closed, this is an error. + // However, the receiver is responsible for storing that error, so we just quit. + debug!( + "{} json encode: unable to send, receiver closed", + monitor_name + ); + break; + } + } + + debug!("{} json encode: exiting", monitor_name); + } + + async fn finalize_file(mut f: BufWriter>) -> Result<()> { + f.flush().await.context("unable to flush buffer")?; + f.shutdown().await.context("unable to write trailer")?; + Ok(()) + } + + pub(crate) async fn close(self) -> Result<()> { + let ToDiskLogger { + output, + last_error, + writer_handle, + } = self; + + // Shut down by dropping the sender, everything else should follow. + drop(output); + + // Wait for the writer to finish + writer_handle + .await + .context("write did not shut down cleanly")?; + + // Check if we maybe had an error somewhere + if let Some(err) = last_error.lock().await.take() { + return Err(err); + } + + Ok(()) + } + + async fn rotate_file( + old_file: Option>>, + target_dir: &PathBuf, + ) -> Result>> { + if let Some(f) = old_file { + Self::finalize_file(f) + .await + .context("unable to finalize previous file")?; + } + + let fp = loop { + let ts = format!("{}", chrono::Utc::now().format("%Y-%m-%d_%H-%M-%S_%Z")); + let fp = target_dir.join(format!("{}.json.gz", ts)); + debug!("checking if new log file {:?} exists...", fp); + if !tokio::fs::try_exists(&fp) + .await + .context("unable to check if file exists")? + { + debug!("log file {:?} does not exist", fp); + break fp; + } + debug!("log file {:?} exists already, sleeping one second...", fp); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + }; + + debug!("creating new log file at {:?}", fp); + let out_file = File::create(fp) + .await + .context("unable to bitswap logging file")?; + + Ok(BufWriter::new(GzipEncoder::new(out_file))) + } + + async fn write_to_file( + monitor_name: String, + target_dir: PathBuf, + error_storage: Arc>>, + mut input: Receiver>, + file: BufWriter>, + ) { + let mut current_file = file; + let newline = "\n".as_bytes(); + + // Create a ticker to rotate the output file every hour. + let mut rotation_ticker = tokio::time::interval(Duration::from_secs(60 * 60)); + // First tick comes for free. + rotation_ticker.tick().await; + + loop { + select! { + msg = input.recv() => { + match msg { + None => { + // Sender closed, we're shutting down (or something went wrong). + debug!("{} write: sender closed, exiting",monitor_name); + // Don't forget to flush and finalize. + if let Err(e) = Self::finalize_file(current_file).await { + error!("{} write: unable to finalize output file: {:?}",monitor_name,e); + let mut last_err = error_storage.lock().await; + *last_err = Some(e.context("unable to finalize output file").into()); + } + break + } + Some(msg) => { + // Write to file + let res = current_file.write_all(&msg).await; + if let Err(e) = res { + error!("{} write: unable to write: {:?}",monitor_name,e); + let mut last_err = error_storage.lock().await; + *last_err = Some(e.context("unable to write").into()); + break + } + let res = current_file.write_all(newline).await; + if let Err(e) = res { + error!("{} write: unable to write: {:?}",monitor_name,e); + let mut last_err = error_storage.lock().await; + *last_err = Some(e.context("unable to write").into()); + break + } + } + } + }, + _ = rotation_ticker.tick() => { + // Rotate the file + debug!("{} write: rotating output file",monitor_name); + current_file = match Self::rotate_file(Some(current_file),&target_dir).await { + Ok(f) => f, + Err(e) => { + error!("{} write: unable to rotate bitswap log file: {:?}",monitor_name,e); + let mut last_err = error_storage.lock().await; + *last_err = Some(e.context("unable to rotate output file").into()); + break + } + } + } + } + } + + debug!("{} write: exiting", monitor_name); + } +} diff --git a/bitswap-monitoring-client/src/main.rs b/bitswap-monitoring-client/src/main.rs index dc2c53a..5e769ab 100644 --- a/bitswap-monitoring-client/src/main.rs +++ b/bitswap-monitoring-client/src/main.rs @@ -6,23 +6,28 @@ extern crate lazy_static; extern crate prometheus; use crate::config::Config; -use crate::prom::{MetricsKey, PublicGatewayStatus}; +use crate::disklog::ToDiskLogger; +use crate::prom::{MetricsKey, MetricsMap, PublicGatewayStatus}; use clap::{App, Arg}; use failure::{err_msg, ResultExt}; use futures_util::StreamExt; use ipfs_monitoring_plugin_client::monitoring::{ - BlockPresenceType, EventType, MonitoringClient, RoutingKeyInformation, + BlockPresenceType, EventType, MonitoringClient, PushedEvent, RoutingKeyInformation, }; use ipfs_resolver_common::wantlist::JSONWantType; use ipfs_resolver_common::{logging, Result}; +use maxminddb::Reader; use prom::{Geolocation, Metrics}; use std::collections::HashSet; use std::env; use std::sync::Arc; use std::time::Duration; +use tokio::select; use tokio::sync::RwLock; +use tokio::task::JoinSet; mod config; +mod disklog; mod gateways; mod geolocation; mod prom; @@ -69,6 +74,10 @@ async fn run_with_config(cfg: Config) -> Result<()> { let country_db = Arc::new(country_db); info!("successfully read MaxMind database"); + if let Some(disk_logging_directory) = &cfg.disk_logging_directory { + info!("will log to disk at {}", disk_logging_directory) + } + // Read list of public gateway IDs. let known_gateways = Arc::new(RwLock::new(HashSet::new())); match cfg.gateway_file_path { @@ -100,21 +109,27 @@ async fn run_with_config(cfg: Config) -> Result<()> { prom::run_prometheus(prometheus_address)?; info!("started prometheus server"); + // Set up shutdown channel + let cancellation_token = tokio_util::sync::CancellationToken::new(); + // Connect to monitors info!("starting infinite connection loop, try Ctrl+C to exit"); - let handles = cfg + let mut set: JoinSet> = JoinSet::new(); + cfg .amqp_servers .into_iter() - .map(|c| { + .for_each(|c| { c.monitor_names .into_iter() - .map(|name| { + .for_each(|name| { let name = name.clone(); let country_db = country_db.clone(); let known_gateways = known_gateways.clone(); let amqp_server_address = c.amqp_server_address.clone(); + let disk_logging_dir = cfg.disk_logging_directory.clone(); + let cancellation_token = cancellation_token.clone(); - tokio::spawn(async move { + set.spawn(async move { // Create metrics for a few popular countries ahead of time. let mut metrics_by_country = Metrics::create_basic_set(&name); let routing_keys = vec![ @@ -128,248 +143,318 @@ async fn run_with_config(cfg: Config) -> Result<()> { loop { let country_db = country_db.clone(); - let res = connect_and_receive( + + debug!( + "connecting to AMQP server {} at {} and subscribing to events for monitor {}...", + name, amqp_server_address, name + ); + let client = MonitoringClient::new(&amqp_server_address, &routing_keys).await?; + info!( + "connected for monitor {} at {}", + name, amqp_server_address + ); + + // Create disk logger + let disk_logger = if let Some(dir) = disk_logging_dir.clone() { + Some( + ToDiskLogger::new_for_monitor(&dir, &name) + .await + .context("unable to set up disk logging")?, + ) + } else { + None + }; + + let res = receive_from_monitor( &mut metrics_by_country, &name, - &amqp_server_address, - &routing_keys, + client, country_db, &known_gateways, + &disk_logger, + &cancellation_token, ) .await; - info!( "server {}, monitor {}: result: {:?}", amqp_server_address, name, res ); + if let Some(logger) = disk_logger { + info!("server {}, monitor {}: finalizing disk logs...",amqp_server_address, name); + if let Err(e) = logger.close().await { + error!("server {}, monitor {}: unable to finalize disk logs: {:?}",amqp_server_address, name,e) + } else { + debug!("server {}, monitor {}: successfully finalized disk logs",amqp_server_address, name); + } + } + + if cancellation_token.is_cancelled() { + info!( + "server {}, monitor {}: exiting", + amqp_server_address, name + ); + return Ok(()) + } + info!( "server {}, monitor {}: sleeping for one second", amqp_server_address, name ); tokio::time::sleep(Duration::from_secs(1)).await; } - }) + + }); }) - .collect::>() - }) - .flatten() - .collect::>(); + }); // Sleep forever (probably) - for handle in handles { - handle.await.context("connection loop failed")?; + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .context("unable to set up signal handling")?; + tokio::select! { + res = set.join_next() => { + error!("listeners failed, shutting down: {:?}", res) + }, + _ = tokio::signal::ctrl_c() => { + info!("received shutdown signal, shutting down...") + }, + _ = sigterm.recv() => { + info!("received SIGTERM, shutting down...") + } } + // Cancel anything still running + cancellation_token.cancel(); + + // Wait for anything still running + set.join_all().await; + Ok(()) } -async fn connect_and_receive( +async fn receive_from_monitor( metrics_by_country: &mut prom::MetricsMap, monitor_name: &str, - amqp_server_address: &str, - routing_keys: &[RoutingKeyInformation], + mut client: MonitoringClient, country_db: Arc>>, known_gateways: &Arc>>, + disk_logger: &Option, + cancellation_token: &tokio_util::sync::CancellationToken, ) -> Result<()> { - debug!( - "connecting to AMQP server {} at {} and subscribing to events for monitor {}...", - monitor_name, amqp_server_address, monitor_name - ); - let mut client = MonitoringClient::new(amqp_server_address, &routing_keys).await?; - info!( - "connected for monitor {} at {}", - monitor_name, amqp_server_address - ); - let mut first = true; - while let Some(events) = client.next().await { - match events { - Err(err) => { - error!("unable to receive events: {}", err); + loop { + select! { + _ = cancellation_token.cancelled() => { + info!("monitor {}: shutdown received", monitor_name); break; } - Ok((_, events)) => { - if first { - first = false; - info!("receiving messages for monitor {}...", monitor_name) + received = client.next() => { + if let Some(events) = received { + let (_, events) = events.context("unable to receive events")?; + if first { + first = false; + info!("receiving messages for monitor {}...", monitor_name) + } + + handle_received_events( + metrics_by_country, + monitor_name, + &country_db, + known_gateways, + disk_logger, + events, + ) + .await?; + } else { + break; } + } + } + } - for event in events { - let geolocation = geolocation::geolocate_event(&country_db, &event); - debug!( - "{}: determined origin of event {:?} to be {:?}", - monitor_name, event, geolocation - ); + info!("monitor {}: exiting...", monitor_name); - let origin_type = if known_gateways.read().await.contains(&event.peer) { - PublicGatewayStatus::Gateway - } else { - PublicGatewayStatus::NonGateway - }; + Ok(()) +} - let metrics_key = MetricsKey { - geo_origin: geolocation, - overlay_origin: origin_type, - }; +async fn handle_received_events( + metrics_by_country: &mut MetricsMap, + monitor_name: &str, + country_db: &Arc>>, + known_gateways: &Arc>>, + disk_logger: &Option, + events: Vec, +) -> Result<()> { + for event in events { + let geolocation = geolocation::geolocate_event(&country_db, &event); + debug!( + "{}: determined origin of event {:?} to be {:?}", + monitor_name, event, geolocation + ); + + let origin_type = if known_gateways.read().await.contains(&event.peer) { + PublicGatewayStatus::Gateway + } else { + PublicGatewayStatus::NonGateway + }; + + let metrics_key = MetricsKey { + geo_origin: geolocation, + overlay_origin: origin_type, + }; + + let metrics = match metrics_by_country.get(&metrics_key) { + None => { + debug!( + "{}: metrics for {:?} missing, creating on the fly...", + monitor_name, metrics_key + ); + match Metrics::new_for_key(monitor_name, &metrics_key) { + Ok(new_metrics) => { + // We know that the metrics_key value is safe, since we were able to create metrics with it. + metrics_by_country.insert(metrics_key.clone(), new_metrics); + // We know this is safe since we just inserted it. + metrics_by_country.get(&metrics_key).unwrap() + } + Err(e) => { + error!( + "unable to create metrics for country {:?} on the fly: {:?}", + metrics_key, e + ); + // We use the Error country instead. + // We know this is safe since that country is always present in the map. + metrics_by_country + .get(&MetricsKey { + geo_origin: Geolocation::Error, + overlay_origin: metrics_key.overlay_origin, + }) + .unwrap() + } + } + } + Some(m) => m, + }; + + // Create a constant-width identifier for logging. + // This makes logging output nicely aligned :) + // We only use this in debug logging, so we only create it if debug logging is enabled. + let ident = if log_enabled!(log::Level::Debug) { + event.constant_width_identifier() + } else { + "".to_string() + }; + + match &event.inner { + EventType::ConnectionEvent(conn_event) => match conn_event.connection_event_type { + ipfs_monitoring_plugin_client::monitoring::ConnectionEventType::Connected => { + metrics.num_connected.inc(); + debug!("{} {:12}", ident, "CONNECTED") + } + ipfs_monitoring_plugin_client::monitoring::ConnectionEventType::Disconnected => { + metrics.num_disconnected.inc(); + debug!("{} {:12}", ident, "DISCONNECTED") + } + }, + EventType::BitswapMessage(msg) => { + metrics.num_messages.inc(); - let metrics = match metrics_by_country.get(&metrics_key) { - None => { - debug!( - "{}: metrics for {:?} missing, creating on the fly...", - monitor_name, metrics_key - ); - match Metrics::new_for_key(monitor_name, &metrics_key) { - Ok(new_metrics) => { - // We know that the metrics_key value is safe, since we were able to create metrics with it. - metrics_by_country.insert(metrics_key.clone(), new_metrics); - // We know this is safe since we just inserted it. - metrics_by_country.get(&metrics_key).unwrap() - } - Err(e) => { - error!( - "unable to create metrics for country {:?} on the fly: {:?}", - metrics_key, e - ); - // We use the Error country instead. - // We know this is safe since that country is always present in the map. - metrics_by_country - .get(&MetricsKey { - geo_origin: Geolocation::Error, - overlay_origin: metrics_key.overlay_origin, - }) - .unwrap() - } - } - } - Some(m) => m, - }; - - // Create a constant-width identifier for logging. - // This makes logging output nicely aligned :) - // We only use this in debug logging, so we only create it if debug logging is enabled. - let ident = if log_enabled!(log::Level::Debug) { - event.constant_width_identifier() + if !msg.wantlist_entries.is_empty() { + if msg.full_wantlist { + metrics.num_wantlists_full.inc(); } else { - "".to_string() - }; - - match &event.inner { - EventType::ConnectionEvent(conn_event) => { - match conn_event.connection_event_type { - ipfs_monitoring_plugin_client::monitoring::ConnectionEventType::Connected => { - metrics.num_connected.inc(); - debug!("{} {:12}", ident, "CONNECTED") - } - ipfs_monitoring_plugin_client::monitoring::ConnectionEventType::Disconnected => { - metrics.num_disconnected.inc(); - debug!("{} {:12}", ident, "DISCONNECTED") - } - } - } - EventType::BitswapMessage(msg) => { - metrics.num_messages.inc(); - - if !msg.wantlist_entries.is_empty() { - if msg.full_wantlist { - metrics.num_wantlists_full.inc(); - } else { - metrics.num_wantlists_incremental.inc(); - } + metrics.num_wantlists_incremental.inc(); + } - for entry in msg.wantlist_entries.iter() { - if entry.cancel { - metrics.num_entries_cancel.inc(); + for entry in msg.wantlist_entries.iter() { + if entry.cancel { + metrics.num_entries_cancel.inc(); + } else { + match entry.want_type { + JSONWantType::Block => { + if entry.send_dont_have { + metrics.num_entries_want_block_send_dont_have.inc(); } else { - match entry.want_type { - JSONWantType::Block => { - if entry.send_dont_have { - metrics - .num_entries_want_block_send_dont_have - .inc(); - } else { - metrics.num_entries_want_block.inc(); - } - } - JSONWantType::Have => { - if entry.send_dont_have { - metrics - .num_entries_want_have_send_dont_have - .inc(); - } else { - metrics.num_entries_want_have.inc(); - } - } - } + metrics.num_entries_want_block.inc(); } - - debug!( - "{} {:4} {:18} ({:10}) {}", - ident, - if msg.full_wantlist { "FULL" } else { "INC" }, - if entry.cancel { - "CANCEL".to_string() - } else { - match entry.want_type { - JSONWantType::Block => { - if entry.send_dont_have { - "WANT_BLOCK|SEND_DH".to_string() - } else { - "WANT_BLOCK".to_string() - } - } - JSONWantType::Have => { - if entry.send_dont_have { - "WANT_HAVE|SEND_DH".to_string() - } else { - "WANT_HAVE".to_string() - } - } - } - }, - entry.priority, - entry.cid.path - ) } - } - - if !msg.blocks.is_empty() { - for entry in msg.blocks.iter() { - metrics.num_blocks.inc(); - debug!("{} {:9} {}", ident, "BLOCK", entry.path) + JSONWantType::Have => { + if entry.send_dont_have { + metrics.num_entries_want_have_send_dont_have.inc(); + } else { + metrics.num_entries_want_have.inc(); + } } } + } - if !msg.block_presences.is_empty() { - for entry in msg.block_presences.iter() { - match entry.block_presence_type { - BlockPresenceType::Have => { - metrics.num_block_presence_have.inc() + debug!( + "{} {:4} {:18} ({:10}) {}", + ident, + if msg.full_wantlist { "FULL" } else { "INC" }, + if entry.cancel { + "CANCEL".to_string() + } else { + match entry.want_type { + JSONWantType::Block => { + if entry.send_dont_have { + "WANT_BLOCK|SEND_DH".to_string() + } else { + "WANT_BLOCK".to_string() } - BlockPresenceType::DontHave => { - metrics.num_block_presence_dont_have.inc() + } + JSONWantType::Have => { + if entry.send_dont_have { + "WANT_HAVE|SEND_DH".to_string() + } else { + "WANT_HAVE".to_string() } } - debug!( - "{} {:9} {}", - ident, - match entry.block_presence_type { - BlockPresenceType::Have => "HAVE".to_string(), - BlockPresenceType::DontHave => "DONT_HAVE".to_string(), - }, - entry.cid.path - ) } + }, + entry.priority, + entry.cid.path + ) + } + } + + if !msg.blocks.is_empty() { + for entry in msg.blocks.iter() { + metrics.num_blocks.inc(); + debug!("{} {:9} {}", ident, "BLOCK", entry.path) + } + } + + if !msg.block_presences.is_empty() { + for entry in msg.block_presences.iter() { + match entry.block_presence_type { + BlockPresenceType::Have => metrics.num_block_presence_have.inc(), + BlockPresenceType::DontHave => { + metrics.num_block_presence_dont_have.inc() } } + debug!( + "{} {:9} {}", + ident, + match entry.block_presence_type { + BlockPresenceType::Have => "HAVE".to_string(), + BlockPresenceType::DontHave => "DONT_HAVE".to_string(), + }, + entry.cid.path + ) } } } } - } - info!("monitor {}: disconnected?", monitor_name); + // Log to disk + if let Some(logger) = disk_logger { + logger + .log_message(event) + .await + .context("unable to log to disk")? + } + } - Ok(()) // I guess? + Ok(()) }