From f8e33f58104e6ed0d38e756beb0572e3bc263f10 Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Wed, 16 Oct 2024 00:58:39 -0700 Subject: [PATCH] [consensus] Migrate sui to Mysticeti connection monitor (#19814) ## Description Will follow up to ensure that known peers are set in sui via discovery so that metrics will be updated. Also will be adding TonicConnectionMonitor for Mysticeti to use. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 1 + consensus/core/src/lib.rs | 9 +- .../core/src/network/connection_monitor.rs | 73 ++-- consensus/core/src/network/metrics.rs | 210 +++++++--- consensus/core/src/network/mod.rs | 3 +- crates/sui-core/src/consensus_adapter.rs | 2 +- crates/sui-node/Cargo.toml | 1 + crates/sui-node/src/lib.rs | 51 +-- narwhal/network/src/connectivity.rs | 381 ------------------ narwhal/network/src/lib.rs | 1 - narwhal/primary/src/metrics.rs | 7 +- narwhal/primary/src/primary.rs | 15 +- narwhal/worker/src/tests/worker_tests.rs | 296 +------------- narwhal/worker/src/worker.rs | 10 +- 14 files changed, 244 insertions(+), 816 deletions(-) delete mode 100644 narwhal/network/src/connectivity.rs diff --git a/Cargo.lock b/Cargo.lock index c51c0ec2d040e..36d43fe998665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14204,6 +14204,7 @@ dependencies = [ "bcs", "bin-version", "clap", + "consensus-core", "fastcrypto", "fastcrypto-zkp", "futures", diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index b7597853dbd2a..36b980311f5d7 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -46,9 +46,12 @@ mod test_dag_parser; /// Exported consensus API. pub use authority_node::ConsensusAuthority; pub use block::{BlockAPI, Round, TransactionIndex}; +/// Exported API for testing. +pub use block::{TestBlock, Transaction, VerifiedBlock}; pub use commit::{CommitDigest, CommitIndex, CommitRef, CommittedSubDag}; pub use commit_consumer::{CommitConsumer, CommitConsumerMonitor}; +pub use network::{ + connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle, ConnectionStatus}, + metrics::{MetricsMakeCallbackHandler, NetworkRouteMetrics, QuinnConnectionMetrics}, +}; pub use transaction::{ClientError, TransactionClient, TransactionVerifier, ValidationError}; - -/// Exported API for testing. -pub use block::{TestBlock, Transaction, VerifiedBlock}; diff --git a/consensus/core/src/network/connection_monitor.rs b/consensus/core/src/network/connection_monitor.rs index 928fc8b41a040..c07fd49996a43 100644 --- a/consensus/core/src/network/connection_monitor.rs +++ b/consensus/core/src/network/connection_monitor.rs @@ -17,11 +17,9 @@ use super::metrics::QuinnConnectionMetrics; const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60); -pub(crate) struct ConnectionMonitorHandle { +pub struct ConnectionMonitorHandle { handle: JoinHandle<()>, stop: Sender<()>, - // TODO: Sui will use this component eventually instead of the NW version - #[allow(unused)] connection_statuses: Arc>, } @@ -30,6 +28,10 @@ impl ConnectionMonitorHandle { self.stop.send(()).ok(); self.handle.await.ok(); } + + pub fn connection_statuses(&self) -> Arc> { + self.connection_statuses.clone() + } } #[derive(Eq, PartialEq, Clone, Debug)] @@ -89,10 +91,10 @@ impl AnemoConnectionMonitor { // we report first all the known peers as disconnected - so we can see // their labels in the metrics reporting tool - for (peer_id, hostname) in &self.known_peers { + for (peer_id, peer_label) in &self.known_peers { self.connection_metrics .network_peer_connected - .with_label_values(&[&format!("{peer_id}"), hostname]) + .with_label_values(&[&format!("{peer_id}"), peer_label]) .set(0) } @@ -114,10 +116,10 @@ impl AnemoConnectionMonitor { self.connection_metrics.socket_send_buffer_size.set( network.socket_send_buf_size() as i64 ); - for (peer_id, hostname) in &self.known_peers { + for (peer_id, peer_label) in &self.known_peers { if let Some(connection) = network.peer(*peer_id) { let stats = connection.connection_stats(); - self.update_quinn_metrics_for_peer(&format!("{peer_id}"), hostname, &stats); + self.update_quinn_metrics_for_peer(&format!("{peer_id}"), peer_label, &stats); } } } else { @@ -153,17 +155,17 @@ impl AnemoConnectionMonitor { // Only report peer IDs for known peers to prevent unlimited cardinality. if self.known_peers.contains_key(&peer_id) { let peer_id_str = format!("{peer_id}"); - let hostname = self.known_peers.get(&peer_id).unwrap(); + let peer_label = self.known_peers.get(&peer_id).unwrap(); self.connection_metrics .network_peer_connected - .with_label_values(&[&peer_id_str, hostname]) + .with_label_values(&[&peer_id_str, peer_label]) .set(int_status); if let PeerEvent::LostPeer(_, reason) = peer_event { self.connection_metrics .network_peer_disconnects - .with_label_values(&[&peer_id_str, hostname, &format!("{reason:?}")]) + .with_label_values(&[&peer_id_str, peer_label, &format!("{reason:?}")]) .inc(); } } @@ -173,85 +175,85 @@ impl AnemoConnectionMonitor { fn update_quinn_metrics_for_peer( &self, peer_id: &str, - hostname: &str, + peer_label: &str, stats: &ConnectionStats, ) { // Update PathStats self.connection_metrics .network_peer_rtt - .with_label_values(&[peer_id, hostname]) + .with_label_values(&[peer_id, peer_label]) .set(stats.path.rtt.as_millis() as i64); self.connection_metrics .network_peer_lost_packets - .with_label_values(&[peer_id, hostname]) + .with_label_values(&[peer_id, peer_label]) .set(stats.path.lost_packets as i64); self.connection_metrics .network_peer_lost_bytes - .with_label_values(&[peer_id, hostname]) + .with_label_values(&[peer_id, peer_label]) .set(stats.path.lost_bytes as i64); self.connection_metrics .network_peer_sent_packets - .with_label_values(&[peer_id, hostname]) + .with_label_values(&[peer_id, peer_label]) .set(stats.path.sent_packets as i64); self.connection_metrics .network_peer_congestion_events - .with_label_values(&[peer_id, hostname]) + .with_label_values(&[peer_id, peer_label]) .set(stats.path.congestion_events as i64); self.connection_metrics .network_peer_congestion_window - .with_label_values(&[peer_id, hostname]) + .with_label_values(&[peer_id, peer_label]) .set(stats.path.cwnd as i64); // Update FrameStats self.connection_metrics .network_peer_max_data - .with_label_values(&[peer_id, hostname, "transmitted"]) + .with_label_values(&[peer_id, peer_label, "transmitted"]) .set(stats.frame_tx.max_data as i64); self.connection_metrics .network_peer_max_data - .with_label_values(&[peer_id, hostname, "received"]) + .with_label_values(&[peer_id, peer_label, "received"]) .set(stats.frame_rx.max_data as i64); self.connection_metrics .network_peer_closed_connections - .with_label_values(&[peer_id, hostname, "transmitted"]) + .with_label_values(&[peer_id, peer_label, "transmitted"]) .set(stats.frame_tx.connection_close as i64); self.connection_metrics .network_peer_closed_connections - .with_label_values(&[peer_id, hostname, "received"]) + .with_label_values(&[peer_id, peer_label, "received"]) .set(stats.frame_rx.connection_close as i64); self.connection_metrics .network_peer_data_blocked - .with_label_values(&[peer_id, hostname, "transmitted"]) + .with_label_values(&[peer_id, peer_label, "transmitted"]) .set(stats.frame_tx.data_blocked as i64); self.connection_metrics .network_peer_data_blocked - .with_label_values(&[peer_id, hostname, "received"]) + .with_label_values(&[peer_id, peer_label, "received"]) .set(stats.frame_rx.data_blocked as i64); // Update UDPStats self.connection_metrics .network_peer_udp_datagrams - .with_label_values(&[peer_id, hostname, "transmitted"]) + .with_label_values(&[peer_id, peer_label, "transmitted"]) .set(stats.udp_tx.datagrams as i64); self.connection_metrics .network_peer_udp_datagrams - .with_label_values(&[peer_id, hostname, "received"]) + .with_label_values(&[peer_id, peer_label, "received"]) .set(stats.udp_rx.datagrams as i64); self.connection_metrics .network_peer_udp_bytes - .with_label_values(&[peer_id, hostname, "transmitted"]) + .with_label_values(&[peer_id, peer_label, "transmitted"]) .set(stats.udp_tx.bytes as i64); self.connection_metrics .network_peer_udp_bytes - .with_label_values(&[peer_id, hostname, "received"]) + .with_label_values(&[peer_id, peer_label, "received"]) .set(stats.udp_rx.bytes as i64); self.connection_metrics .network_peer_udp_transmits - .with_label_values(&[peer_id, hostname, "transmitted"]) + .with_label_values(&[peer_id, peer_label, "transmitted"]) .set(stats.udp_tx.ios as i64); self.connection_metrics .network_peer_udp_transmits - .with_label_values(&[peer_id, hostname, "received"]) + .with_label_values(&[peer_id, peer_label, "received"]) .set(stats.udp_rx.ios as i64); } } @@ -276,7 +278,7 @@ mod tests { let network_3 = build_network().unwrap(); let registry = Registry::new(); - let metrics = Arc::new(QuinnConnectionMetrics::new(®istry)); + let metrics = Arc::new(QuinnConnectionMetrics::new("consensus", ®istry)); // AND we connect to peer 2 let peer_2 = network_1.connect(network_2.local_addr()).await.unwrap(); @@ -288,6 +290,7 @@ mod tests { // WHEN bring up the monitor let handle = AnemoConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), known_peers); + let connection_statuses = handle.connection_statuses(); // THEN peer 2 should be already connected assert_network_peers(&metrics, 1).await; @@ -296,7 +299,7 @@ mod tests { let mut labels = HashMap::new(); let peer_2_str = format!("{peer_2}"); labels.insert("peer_id", peer_2_str.as_str()); - labels.insert("hostname", "peer_2"); + labels.insert("peer_label", "peer_2"); assert_ne!( metrics .network_peer_rtt @@ -306,7 +309,7 @@ mod tests { 0 ); assert_eq!( - *handle.connection_statuses.get(&peer_2).unwrap().value(), + *connection_statuses.get(&peer_2).unwrap().value(), ConnectionStatus::Connected ); @@ -316,7 +319,7 @@ mod tests { // THEN assert_network_peers(&metrics, 2).await; assert_eq!( - *handle.connection_statuses.get(&peer_3).unwrap().value(), + *connection_statuses.get(&peer_3).unwrap().value(), ConnectionStatus::Connected ); @@ -326,7 +329,7 @@ mod tests { // THEN assert_network_peers(&metrics, 1).await; assert_eq!( - *handle.connection_statuses.get(&peer_2).unwrap().value(), + *connection_statuses.get(&peer_2).unwrap().value(), ConnectionStatus::Disconnected ); @@ -336,7 +339,7 @@ mod tests { // THEN assert_network_peers(&metrics, 0).await; assert_eq!( - *handle.connection_statuses.get(&peer_3).unwrap().value(), + *connection_statuses.get(&peer_3).unwrap().value(), ConnectionStatus::Disconnected ); } diff --git a/consensus/core/src/network/metrics.rs b/consensus/core/src/network/metrics.rs index 841eb3906f348..dd9c5e63171a2 100644 --- a/consensus/core/src/network/metrics.rs +++ b/consensus/core/src/network/metrics.rs @@ -3,11 +3,13 @@ use std::sync::Arc; +use anemo_tower::callback::{MakeCallbackHandler, ResponseHandler}; use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, - register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec, - IntCounterVec, IntGauge, IntGaugeVec, Registry, + register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramTimer, + HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; +use tracing::warn; // Fields for network-agnostic metrics can be added here pub(crate) struct NetworkMetrics { @@ -29,10 +31,10 @@ impl NetworkMetrics { registry ) .unwrap(), - inbound: Arc::new(NetworkRouteMetrics::new("inbound", registry)), - outbound: Arc::new(NetworkRouteMetrics::new("outbound", registry)), + inbound: Arc::new(NetworkRouteMetrics::new("", "inbound", registry)), + outbound: Arc::new(NetworkRouteMetrics::new("", "outbound", registry)), tcp_connection_metrics: Arc::new(TcpConnectionMetrics::new(registry)), - quinn_connection_metrics: Arc::new(QuinnConnectionMetrics::new(registry)), + quinn_connection_metrics: Arc::new(QuinnConnectionMetrics::new("", registry)), } } } @@ -80,7 +82,7 @@ impl TcpConnectionMetrics { } } -pub(crate) struct QuinnConnectionMetrics { +pub struct QuinnConnectionMetrics { /// The connection status of known peers. 0 if not connected, 1 if connected. pub network_peer_connected: IntGaugeVec, /// The number of connected peers @@ -124,36 +126,36 @@ pub(crate) struct QuinnConnectionMetrics { } impl QuinnConnectionMetrics { - pub fn new(registry: &Registry) -> Self { + pub fn new(node: &'static str, registry: &Registry) -> Self { Self { network_peer_connected: register_int_gauge_vec_with_registry!( - "quinn_network_peer_connected", + format!("{node}_quinn_network_peer_connected"), "The connection status of a peer. 0 if not connected, 1 if connected", - &["peer_id", "hostname"], + &["peer_id", "peer_label"], registry ) .unwrap(), network_peers: register_int_gauge_with_registry!( - "quinn_network_peers", + format!("{node}_quinn_network_peers"), "The number of connected peers.", registry ) .unwrap(), network_peer_disconnects: register_int_counter_vec_with_registry!( - "quinn_network_peer_disconnects", + format!("{node}_quinn_network_peer_disconnects"), "Number of disconnect events per peer.", - &["peer_id", "hostname", "reason"], + &["peer_id", "peer_label", "reason"], registry ) .unwrap(), socket_receive_buffer_size: register_int_gauge_with_registry!( - "quinn_socket_receive_buffer_size", + format!("{node}_quinn_socket_receive_buffer_size"), "Receive buffer size of Anemo socket.", registry ) .unwrap(), socket_send_buffer_size: register_int_gauge_with_registry!( - "quinn_socket_send_buffer_size", + format!("{node}_quinn_socket_send_buffer_size"), "Send buffer size of Anemo socket.", registry ) @@ -161,90 +163,90 @@ impl QuinnConnectionMetrics { // PathStats network_peer_rtt: register_int_gauge_vec_with_registry!( - "quinn_network_peer_rtt", + format!("{node}_quinn_network_peer_rtt"), "The rtt for a peer connection in ms.", - &["peer_id", "hostname"], + &["peer_id", "peer_label"], registry ) .unwrap(), network_peer_lost_packets: register_int_gauge_vec_with_registry!( - "quinn_network_peer_lost_packets", + format!("{node}_quinn_network_peer_lost_packets"), "The total number of lost packets for a peer connection.", - &["peer_id", "hostname"], + &["peer_id", "peer_label"], registry ) .unwrap(), network_peer_lost_bytes: register_int_gauge_vec_with_registry!( - "quinn_network_peer_lost_bytes", + format!("{node}_quinn_network_peer_lost_bytes"), "The total number of lost bytes for a peer connection.", - &["peer_id", "hostname"], + &["peer_id", "peer_label"], registry ) .unwrap(), network_peer_sent_packets: register_int_gauge_vec_with_registry!( - "quinn_network_peer_sent_packets", + format!("{node}_quinn_network_peer_sent_packets"), "The total number of sent packets for a peer connection.", - &["peer_id", "hostname"], + &["peer_id", "peer_label"], registry ) .unwrap(), network_peer_congestion_events: register_int_gauge_vec_with_registry!( - "quinn_network_peer_congestion_events", + format!("{node}_quinn_network_peer_congestion_events"), "The total number of congestion events for a peer connection.", - &["peer_id", "hostname"], + &["peer_id", "peer_label"], registry ) .unwrap(), network_peer_congestion_window: register_int_gauge_vec_with_registry!( - "quinn_network_peer_congestion_window", + format!("{node}_quinn_network_peer_congestion_window"), "The congestion window for a peer connection.", - &["peer_id", "hostname"], + &["peer_id", "peer_label"], registry ) .unwrap(), // FrameStats network_peer_closed_connections: register_int_gauge_vec_with_registry!( - "quinn_network_peer_closed_connections", + format!("{node}_quinn_network_peer_closed_connections"), "The number of closed connections for a peer connection.", - &["peer_id", "hostname", "direction"], + &["peer_id", "peer_label", "direction"], registry ) .unwrap(), network_peer_max_data: register_int_gauge_vec_with_registry!( - "quinn_network_peer_max_data", + format!("{node}_quinn_network_peer_max_data"), "The number of max data frames for a peer connection.", - &["peer_id", "hostname", "direction"], + &["peer_id", "peer_label", "direction"], registry ) .unwrap(), network_peer_data_blocked: register_int_gauge_vec_with_registry!( - "quinn_network_peer_data_blocked", + format!("{node}_quinn_network_peer_data_blocked"), "The number of data blocked frames for a peer connection.", - &["peer_id", "hostname", "direction"], + &["peer_id", "peer_label", "direction"], registry ) .unwrap(), // UDPStats network_peer_udp_datagrams: register_int_gauge_vec_with_registry!( - "quinn_network_peer_udp_datagrams", + format!("{node}_quinn_network_peer_udp_datagrams"), "The total number datagrams observed by the UDP peer connection.", - &["peer_id", "hostname", "direction"], + &["peer_id", "peer_label", "direction"], registry ) .unwrap(), network_peer_udp_bytes: register_int_gauge_vec_with_registry!( - "quinn_network_peer_udp_bytes", + format!("{node}_quinn_network_peer_udp_bytes"), "The total number bytes observed by the UDP peer connection.", - &["peer_id", "hostname", "direction"], + &["peer_id", "peer_label", "direction"], registry ) .unwrap(), network_peer_udp_transmits: register_int_gauge_vec_with_registry!( - "quinn_network_peer_udp_transmits", + format!("{node}_quinn_network_peer_udp_transmits"), "The total number transmits observed by the UDP peer connection.", - &["peer_id", "hostname", "direction"], + &["peer_id", "peer_label", "direction"], registry ) .unwrap(), @@ -253,7 +255,7 @@ impl QuinnConnectionMetrics { } #[derive(Clone)] -pub(crate) struct NetworkRouteMetrics { +pub struct NetworkRouteMetrics { /// Counter of requests by route pub requests: IntCounterVec, /// Request latency by route @@ -287,9 +289,9 @@ const SIZE_BYTE_BUCKETS: &[f64] = &[ ]; impl NetworkRouteMetrics { - pub fn new(direction: &'static str, registry: &Registry) -> Self { + pub fn new(node: &'static str, direction: &'static str, registry: &Registry) -> Self { let requests = register_int_counter_vec_with_registry!( - format!("{direction}_requests"), + format!("{node}_{direction}_requests"), "The number of requests made on the network", &["route"], registry @@ -297,7 +299,7 @@ impl NetworkRouteMetrics { .unwrap(); let request_latency = register_histogram_vec_with_registry!( - format!("{direction}_request_latency"), + format!("{node}_{direction}_request_latency"), "Latency of a request by route", &["route"], LATENCY_SEC_BUCKETS.to_vec(), @@ -306,7 +308,7 @@ impl NetworkRouteMetrics { .unwrap(); let request_size = register_histogram_vec_with_registry!( - format!("{direction}_request_size"), + format!("{node}_{direction}_request_size"), "Size of a request by route", &["route"], SIZE_BYTE_BUCKETS.to_vec(), @@ -315,7 +317,7 @@ impl NetworkRouteMetrics { .unwrap(); let response_size = register_histogram_vec_with_registry!( - format!("{direction}_response_size"), + format!("{node}_{direction}_response_size"), "Size of a response by route", &["route"], SIZE_BYTE_BUCKETS.to_vec(), @@ -324,7 +326,7 @@ impl NetworkRouteMetrics { .unwrap(); let excessive_size_requests = register_int_counter_vec_with_registry!( - format!("{direction}_excessive_size_requests"), + format!("{node}_{direction}_excessive_size_requests"), "The number of excessively large request messages sent", &["route"], registry @@ -332,7 +334,7 @@ impl NetworkRouteMetrics { .unwrap(); let excessive_size_responses = register_int_counter_vec_with_registry!( - format!("{direction}_excessive_size_responses"), + format!("{node}_{direction}_excessive_size_responses"), "The number of excessively large response messages seen", &["route"], registry @@ -340,7 +342,7 @@ impl NetworkRouteMetrics { .unwrap(); let inflight_requests = register_int_gauge_vec_with_registry!( - format!("{direction}_inflight_requests"), + format!("{node}_{direction}_inflight_requests"), "The number of inflight network requests", &["route"], registry @@ -348,7 +350,7 @@ impl NetworkRouteMetrics { .unwrap(); let errors = register_int_counter_vec_with_registry!( - format!("{direction}_request_errors"), + format!("{node}_{direction}_request_errors"), "Number of errors by route", &["route", "status"], registry, @@ -367,3 +369,115 @@ impl NetworkRouteMetrics { } } } + +#[derive(Clone)] +pub struct MetricsMakeCallbackHandler { + metrics: Arc, + /// Size in bytes above which a request or response message is considered excessively large + excessive_message_size: usize, +} + +impl MetricsMakeCallbackHandler { + pub fn new(metrics: Arc, excessive_message_size: usize) -> Self { + Self { + metrics, + excessive_message_size, + } + } +} + +impl MakeCallbackHandler for MetricsMakeCallbackHandler { + type Handler = MetricsResponseHandler; + + fn make_handler(&self, request: &anemo::Request) -> Self::Handler { + let route = request.route().to_owned(); + + self.metrics.requests.with_label_values(&[&route]).inc(); + self.metrics + .inflight_requests + .with_label_values(&[&route]) + .inc(); + let body_len = request.body().len(); + self.metrics + .request_size + .with_label_values(&[&route]) + .observe(body_len as f64); + if body_len > self.excessive_message_size { + warn!( + "Saw excessively large request with size {body_len} for {route} with peer {:?}", + request.peer_id() + ); + self.metrics + .excessive_size_requests + .with_label_values(&[&route]) + .inc(); + } + + let timer = self + .metrics + .request_latency + .with_label_values(&[&route]) + .start_timer(); + + MetricsResponseHandler { + metrics: self.metrics.clone(), + timer, + route, + excessive_message_size: self.excessive_message_size, + } + } +} + +pub struct MetricsResponseHandler { + metrics: Arc, + // The timer is held on to and "observed" once dropped + #[allow(unused)] + timer: HistogramTimer, + route: String, + excessive_message_size: usize, +} + +impl ResponseHandler for MetricsResponseHandler { + fn on_response(self, response: &anemo::Response) { + let body_len = response.body().len(); + self.metrics + .response_size + .with_label_values(&[&self.route]) + .observe(body_len as f64); + if body_len > self.excessive_message_size { + warn!( + "Saw excessively large response with size {body_len} for {} with peer {:?}", + self.route, + response.peer_id() + ); + self.metrics + .excessive_size_responses + .with_label_values(&[&self.route]) + .inc(); + } + + if !response.status().is_success() { + let status = response.status().to_u16().to_string(); + self.metrics + .errors + .with_label_values(&[&self.route, &status]) + .inc(); + } + } + + fn on_error(self, _error: &E) { + self.metrics + .errors + .with_label_values(&[&self.route, "unknown"]) + .inc(); + } +} + +impl Drop for MetricsResponseHandler { + fn drop(&mut self) { + self.metrics + .inflight_requests + .with_label_values(&[&self.route]) + .dec(); + } +} diff --git a/consensus/core/src/network/mod.rs b/consensus/core/src/network/mod.rs index a533222353204..86477d972c22c 100644 --- a/consensus/core/src/network/mod.rs +++ b/consensus/core/src/network/mod.rs @@ -41,8 +41,9 @@ mod tonic_gen { include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs")); } +pub mod connection_monitor; + pub(crate) mod anemo_network; -pub(crate) mod connection_monitor; pub(crate) mod epoch_filter; pub(crate) mod metrics; mod metrics_layer; diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index 06f3241aedff3..22b94b87f1f9a 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -41,10 +41,10 @@ use crate::consensus_handler::{classify, SequencedConsensusTransactionKey}; use crate::consensus_throughput_calculator::{ConsensusThroughputProfiler, Level}; use crate::epoch::reconfiguration::{ReconfigState, ReconfigurationInitiator}; use crate::metrics::LatencyObserver; +use consensus_core::ConnectionStatus; use mysten_metrics::{spawn_monitored_task, GaugeGuard, GaugeGuardFutureExt}; use sui_protocol_config::ProtocolConfig; use sui_simulator::anemo::PeerId; -use sui_simulator::narwhal_network::connectivity::ConnectionStatus; use sui_types::base_types::AuthorityName; use sui_types::fp_ensure; use sui_types::messages_consensus::ConsensusTransactionKind; diff --git a/crates/sui-node/Cargo.toml b/crates/sui-node/Cargo.toml index a9342adbb5a7c..636761b27341b 100644 --- a/crates/sui-node/Cargo.toml +++ b/crates/sui-node/Cargo.toml @@ -18,6 +18,7 @@ anyhow.workspace = true base64.workspace = true bcs.workspace = true clap.workspace = true +consensus-core.workspace = true prometheus.workspace = true tokio = { workspace = true, features = ["full"] } tracing.workspace = true diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index efcf87b326afa..011b369ca9352 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -57,8 +57,6 @@ pub use handle::SuiNodeHandle; use mysten_metrics::{spawn_monitored_task, RegistryService}; use mysten_network::server::ServerBuilder; use mysten_service::server_timing::server_timing_middleware; -use narwhal_network::metrics::MetricsMakeCallbackHandler; -use narwhal_network::metrics::{NetworkConnectionMetrics, NetworkMetrics}; use sui_archival::reader::ArchiveReaderBalancer; use sui_archival::writer::ArchiveWriter; use sui_config::node::{DBCheckpointConfig, RunWithRange}; @@ -231,6 +229,7 @@ pub struct SuiNode { metrics: Arc, _discovery: discovery::Handle, + _connection_monitor_handle: consensus_core::ConnectionMonitorHandle, state_sync_handle: state_sync::Handle, randomness_handle: randomness::Handle, checkpoint_store: Arc, @@ -769,21 +768,22 @@ impl SuiNode { .epoch_start_state() .get_authority_names_to_peer_ids(); - let network_connection_metrics = - NetworkConnectionMetrics::new("sui", ®istry_service.default_registry()); + let network_connection_metrics = consensus_core::QuinnConnectionMetrics::new( + "sui", + ®istry_service.default_registry(), + ); let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids); - let (_connection_monitor_handle, connection_statuses) = - narwhal_network::connectivity::ConnectionMonitor::spawn( - p2p_network.downgrade(), - network_connection_metrics, - HashMap::new(), - None, - ); + let connection_monitor_handle = consensus_core::AnemoConnectionMonitor::spawn( + p2p_network.downgrade(), + Arc::new(network_connection_metrics), + // TODO: add known seed peers via discovery so metrics will update. + HashMap::new(), + ); let connection_monitor_status = ConnectionMonitorStatus { - connection_statuses, + connection_statuses: connection_monitor_handle.connection_statuses(), authority_names_to_peer_ids, }; @@ -826,6 +826,7 @@ impl SuiNode { metrics: sui_node_metrics, _discovery: discovery_handle, + _connection_monitor_handle: connection_monitor_handle, state_sync_handle, randomness_handle, checkpoint_store, @@ -1055,9 +1056,9 @@ impl SuiNode { let routes = routes.merge(randomness_router); let inbound_network_metrics = - NetworkMetrics::new("sui", "inbound", prometheus_registry); + consensus_core::NetworkRouteMetrics::new("sui", "inbound", prometheus_registry); let outbound_network_metrics = - NetworkMetrics::new("sui", "outbound", prometheus_registry); + consensus_core::NetworkRouteMetrics::new("sui", "outbound", prometheus_registry); let service = ServiceBuilder::new() .layer( @@ -1065,10 +1066,12 @@ impl SuiNode { .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO)) .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)), ) - .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new( - Arc::new(inbound_network_metrics), - config.p2p_config.excessive_message_size(), - ))) + .layer(CallbackLayer::new( + consensus_core::MetricsMakeCallbackHandler::new( + Arc::new(inbound_network_metrics), + config.p2p_config.excessive_message_size(), + ), + )) .service(routes); let outbound_layer = ServiceBuilder::new() @@ -1077,10 +1080,12 @@ impl SuiNode { .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO)) .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)), ) - .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new( - Arc::new(outbound_network_metrics), - config.p2p_config.excessive_message_size(), - ))) + .layer(CallbackLayer::new( + consensus_core::MetricsMakeCallbackHandler::new( + Arc::new(outbound_network_metrics), + config.p2p_config.excessive_message_size(), + ), + )) .into_inner(); let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default(); @@ -1695,7 +1700,7 @@ impl SuiNode { consensus_store_pruner.prune(next_epoch).await; if self.state.is_validator(&new_epoch_store) { - // Only restart Narwhal if this node is still a validator in the new epoch. + // Only restart consensus if this node is still a validator in the new epoch. Some( Self::start_epoch_specific_validator_components( &self.config, diff --git a/narwhal/network/src/connectivity.rs b/narwhal/network/src/connectivity.rs deleted file mode 100644 index dbd9e9526f8bb..0000000000000 --- a/narwhal/network/src/connectivity.rs +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use crate::metrics::NetworkConnectionMetrics; -use anemo::types::PeerEvent; -use anemo::PeerId; -use dashmap::DashMap; -use futures::future; -use mysten_metrics::spawn_logged_monitored_task; -use quinn_proto::ConnectionStats; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; -use tokio::task::JoinHandle; -use tokio::time; -use types::ConditionalBroadcastReceiver; - -const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60); - -#[derive(Eq, PartialEq, Clone, Debug)] -pub enum ConnectionStatus { - Connected, - Disconnected, -} - -pub struct ConnectionMonitor { - network: anemo::NetworkRef, - connection_metrics: NetworkConnectionMetrics, - peer_id_types: HashMap, - connection_statuses: Arc>, - rx_shutdown: Option, -} - -impl ConnectionMonitor { - #[must_use] - pub fn spawn( - network: anemo::NetworkRef, - connection_metrics: NetworkConnectionMetrics, - peer_id_types: HashMap, - rx_shutdown: Option, - ) -> (JoinHandle<()>, Arc>) { - let connection_statuses_outer = Arc::new(DashMap::new()); - let connection_statuses = connection_statuses_outer.clone(); - ( - spawn_logged_monitored_task!( - Self { - network, - connection_metrics, - peer_id_types, - connection_statuses, - rx_shutdown - } - .run(), - "ConnectionMonitor" - ), - connection_statuses_outer, - ) - } - - async fn run(mut self) { - let (mut subscriber, connected_peers) = { - if let Some(network) = self.network.upgrade() { - let Ok((subscriber, active_peers)) = network.subscribe() else { - return; - }; - (subscriber, active_peers) - } else { - return; - } - }; - - // we report first all the known peers as disconnected - so we can see - // their labels in the metrics reporting tool - let mut known_peers = Vec::new(); - for (peer_id, ty) in &self.peer_id_types { - known_peers.push(*peer_id); - self.connection_metrics - .network_peer_connected - .with_label_values(&[&format!("{peer_id}"), ty]) - .set(0) - } - - // now report the connected peers - for peer_id in connected_peers.iter() { - self.handle_peer_event(PeerEvent::NewPeer(*peer_id)).await; - } - - let mut connection_stat_collection_interval = - time::interval(CONNECTION_STAT_COLLECTION_INTERVAL); - - async fn wait_for_shutdown( - rx_shutdown: &mut Option, - ) -> Result<(), tokio::sync::broadcast::error::RecvError> { - if let Some(rx) = rx_shutdown.as_mut() { - rx.receiver.recv().await - } else { - // If no shutdown receiver is provided, wait forever. - let future = future::pending(); - #[allow(clippy::let_unit_value)] - let () = future.await; - Ok(()) - } - } - - loop { - tokio::select! { - _ = connection_stat_collection_interval.tick() => { - if let Some(network) = self.network.upgrade() { - self.connection_metrics.socket_receive_buffer_size.set( - network.socket_receive_buf_size() as i64 - ); - self.connection_metrics.socket_send_buffer_size.set( - network.socket_send_buf_size() as i64 - ); - for peer_id in known_peers.iter() { - if let Some(connection) = network.peer(*peer_id) { - let stats = connection.connection_stats(); - self.update_quinn_metrics_for_peer(&format!("{peer_id}"), &stats); - } - } - } else { - continue; - } - } - Ok(event) = subscriber.recv() => { - self.handle_peer_event(event).await; - } - _ = wait_for_shutdown(&mut self.rx_shutdown) => { - return; - } - } - } - } - - async fn handle_peer_event(&self, peer_event: PeerEvent) { - if let Some(network) = self.network.upgrade() { - self.connection_metrics - .network_peers - .set(network.peers().len() as i64); - } else { - return; - } - - let (peer_id, status, int_status) = match peer_event { - PeerEvent::NewPeer(peer_id) => (peer_id, ConnectionStatus::Connected, 1), - PeerEvent::LostPeer(peer_id, _) => (peer_id, ConnectionStatus::Disconnected, 0), - }; - self.connection_statuses.insert(peer_id, status); - - // Only report peer IDs for known peers to prevent unlimited cardinality. - let peer_id_str = if self.peer_id_types.contains_key(&peer_id) { - format!("{peer_id}") - } else { - "other_peer".to_string() - }; - - if let Some(ty) = self.peer_id_types.get(&peer_id) { - self.connection_metrics - .network_peer_connected - .with_label_values(&[&peer_id_str, ty]) - .set(int_status); - } - - if let PeerEvent::LostPeer(_, reason) = peer_event { - self.connection_metrics - .network_peer_disconnects - .with_label_values(&[&peer_id_str, &format!("{reason:?}")]) - .inc(); - } - } - - // TODO: Replace this with ClosureMetric - fn update_quinn_metrics_for_peer(&self, peer_id: &str, stats: &ConnectionStats) { - // Update PathStats - self.connection_metrics - .network_peer_rtt - .with_label_values(&[peer_id]) - .set(stats.path.rtt.as_millis() as i64); - self.connection_metrics - .network_peer_lost_packets - .with_label_values(&[peer_id]) - .set(stats.path.lost_packets as i64); - self.connection_metrics - .network_peer_lost_bytes - .with_label_values(&[peer_id]) - .set(stats.path.lost_bytes as i64); - self.connection_metrics - .network_peer_sent_packets - .with_label_values(&[peer_id]) - .set(stats.path.sent_packets as i64); - self.connection_metrics - .network_peer_congestion_events - .with_label_values(&[peer_id]) - .set(stats.path.congestion_events as i64); - self.connection_metrics - .network_peer_congestion_window - .with_label_values(&[peer_id]) - .set(stats.path.cwnd as i64); - - // Update FrameStats - self.connection_metrics - .network_peer_max_data - .with_label_values(&[peer_id, "transmitted"]) - .set(stats.frame_tx.max_data as i64); - self.connection_metrics - .network_peer_max_data - .with_label_values(&[peer_id, "received"]) - .set(stats.frame_rx.max_data as i64); - self.connection_metrics - .network_peer_closed_connections - .with_label_values(&[peer_id, "transmitted"]) - .set(stats.frame_tx.connection_close as i64); - self.connection_metrics - .network_peer_closed_connections - .with_label_values(&[peer_id, "received"]) - .set(stats.frame_rx.connection_close as i64); - self.connection_metrics - .network_peer_data_blocked - .with_label_values(&[peer_id, "transmitted"]) - .set(stats.frame_tx.data_blocked as i64); - self.connection_metrics - .network_peer_data_blocked - .with_label_values(&[peer_id, "received"]) - .set(stats.frame_rx.data_blocked as i64); - - // Update UDPStats - self.connection_metrics - .network_peer_udp_datagrams - .with_label_values(&[peer_id, "transmitted"]) - .set(stats.udp_tx.datagrams as i64); - self.connection_metrics - .network_peer_udp_datagrams - .with_label_values(&[peer_id, "received"]) - .set(stats.udp_rx.datagrams as i64); - self.connection_metrics - .network_peer_udp_bytes - .with_label_values(&[peer_id, "transmitted"]) - .set(stats.udp_tx.bytes as i64); - self.connection_metrics - .network_peer_udp_bytes - .with_label_values(&[peer_id, "received"]) - .set(stats.udp_rx.bytes as i64); - self.connection_metrics - .network_peer_udp_transmits - .with_label_values(&[peer_id, "transmitted"]) - .set(stats.udp_tx.ios as i64); - self.connection_metrics - .network_peer_udp_transmits - .with_label_values(&[peer_id, "received"]) - .set(stats.udp_rx.ios as i64); - } -} - -#[cfg(test)] -mod tests { - use crate::connectivity::{ConnectionMonitor, ConnectionStatus}; - use crate::metrics::NetworkConnectionMetrics; - use anemo::{Network, Request, Response}; - use bytes::Bytes; - use prometheus::Registry; - use std::collections::HashMap; - use std::convert::Infallible; - use std::time::Duration; - use tokio::time::{sleep, timeout}; - use tower::util::BoxCloneService; - - #[tokio::test] - async fn test_connectivity() { - // GIVEN - let network_1 = build_network().unwrap(); - let network_2 = build_network().unwrap(); - let network_3 = build_network().unwrap(); - - let registry = Registry::new(); - let metrics = NetworkConnectionMetrics::new("primary", ®istry); - - // AND we connect to peer 2 - let peer_2 = network_1.connect(network_2.local_addr()).await.unwrap(); - - let mut peer_types = HashMap::new(); - peer_types.insert(network_2.peer_id(), "other_network".to_string()); - peer_types.insert(network_3.peer_id(), "other_network".to_string()); - - // WHEN bring up the monitor - let (_h, statuses) = - ConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), peer_types, None); - - // THEN peer 2 should be already connected - assert_network_peers(metrics.clone(), 1).await; - - // AND we should have collected connection stats - let mut labels = HashMap::new(); - let peer_2_str = format!("{peer_2}"); - labels.insert("peer_id", peer_2_str.as_str()); - assert_ne!( - metrics - .network_peer_rtt - .get_metric_with(&labels) - .unwrap() - .get(), - 0 - ); - assert_eq!( - *statuses.get(&peer_2).unwrap().value(), - ConnectionStatus::Connected - ); - - // WHEN connect to peer 3 - let peer_3 = network_1.connect(network_3.local_addr()).await.unwrap(); - - // THEN - assert_network_peers(metrics.clone(), 2).await; - assert_eq!( - *statuses.get(&peer_3).unwrap().value(), - ConnectionStatus::Connected - ); - - // AND disconnect peer 2 - network_1.disconnect(peer_2).unwrap(); - - // THEN - assert_network_peers(metrics.clone(), 1).await; - assert_eq!( - *statuses.get(&peer_2).unwrap().value(), - ConnectionStatus::Disconnected - ); - - // AND disconnect peer 3 - network_1.disconnect(peer_3).unwrap(); - - // THEN - assert_network_peers(metrics.clone(), 0).await; - assert_eq!( - *statuses.get(&peer_3).unwrap().value(), - ConnectionStatus::Disconnected - ); - } - - async fn assert_network_peers(metrics: NetworkConnectionMetrics, value: i64) { - let m = metrics.clone(); - timeout(Duration::from_secs(5), async move { - while m.network_peers.get() != value { - sleep(Duration::from_millis(500)).await; - } - }) - .await - .unwrap_or_else(|_| { - panic!( - "Timeout while waiting for connectivity results for value {}", - value - ) - }); - - assert_eq!(metrics.network_peers.get(), value); - } - - fn build_network() -> anyhow::Result { - let network = Network::bind("localhost:0") - .private_key(random_private_key()) - .server_name("test") - .start(echo_service())?; - Ok(network) - } - - fn echo_service() -> BoxCloneService, Response, Infallible> { - let handle = move |request: Request| async move { - let response = Response::new(request.into_body()); - Result::, Infallible>::Ok(response) - }; - - tower::ServiceExt::boxed_clone(tower::service_fn(handle)) - } - - fn random_private_key() -> [u8; 32] { - let mut rng = rand::thread_rng(); - let mut bytes = [0u8; 32]; - rand::RngCore::fill_bytes(&mut rng, &mut bytes[..]); - - bytes - } -} diff --git a/narwhal/network/src/lib.rs b/narwhal/network/src/lib.rs index 5e9add7e20b76..f781bf851a66a 100644 --- a/narwhal/network/src/lib.rs +++ b/narwhal/network/src/lib.rs @@ -11,7 +11,6 @@ pub mod admin; pub mod client; -pub mod connectivity; pub mod epoch_filter; pub mod failpoints; pub mod metrics; diff --git a/narwhal/primary/src/metrics.rs b/narwhal/primary/src/metrics.rs index d6340253e8f92..c32f0f0bea5fa 100644 --- a/narwhal/primary/src/metrics.rs +++ b/narwhal/primary/src/metrics.rs @@ -1,6 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use network::metrics::{NetworkConnectionMetrics, NetworkMetrics}; +use network::metrics::NetworkMetrics; use prometheus::{ core::{AtomicI64, GenericGauge}, default_registry, linear_buckets, register_histogram_vec_with_registry, @@ -22,7 +22,6 @@ pub(crate) struct Metrics { pub(crate) outbound_network_metrics: Option, pub(crate) primary_channel_metrics: Option, pub(crate) node_metrics: Option, - pub(crate) network_connection_metrics: Option, } /// Initialises the metrics @@ -37,15 +36,11 @@ pub(crate) fn initialise_metrics(metrics_registry: &Registry) -> Metrics { // Essential/core metrics across the primary node let node_metrics = PrimaryMetrics::new(metrics_registry); - // Network metrics for the primary connection - let network_connection_metrics = NetworkConnectionMetrics::new("primary", metrics_registry); - Metrics { node_metrics: Some(node_metrics), primary_channel_metrics: Some(primary_channel_metrics), inbound_network_metrics: Some(inbound_network_metrics), outbound_network_metrics: Some(outbound_network_metrics), - network_connection_metrics: Some(network_connection_metrics), } } diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 6b2a7f84cac1a..aa6145dafdc9b 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -126,7 +126,6 @@ impl Primary { let inbound_network_metrics = Arc::new(metrics.inbound_network_metrics.unwrap()); let outbound_network_metrics = Arc::new(metrics.outbound_network_metrics.unwrap()); let node_metrics = Arc::new(metrics.node_metrics.unwrap()); - let network_connection_metrics = metrics.network_connection_metrics.unwrap(); let (tx_our_digests, rx_our_digests) = channel_with_total( CHANNEL_CAPACITY, @@ -404,13 +403,6 @@ impl Primary { ); } - let (connection_monitor_handle, _) = network::connectivity::ConnectionMonitor::spawn( - network.downgrade(), - network_connection_metrics, - peer_types, - Some(tx_shutdown.subscribe()), - ); - info!( "Primary {} listening to network admin messages on 127.0.0.1:{}", authority.id(), @@ -477,12 +469,7 @@ impl Primary { leader_schedule.clone(), ); - let mut handles = vec![ - core_handle, - certificate_fetcher_handle, - proposer_handle, - connection_monitor_handle, - ]; + let mut handles = vec![core_handle, certificate_fetcher_handle, proposer_handle]; handles.extend(admin_handles); // Keeps track of the latest consensus round and allows other tasks to clean up their their internal state diff --git a/narwhal/worker/src/tests/worker_tests.rs b/narwhal/worker/src/tests/worker_tests.rs index 9d031756bf4d7..99421109124da 100644 --- a/narwhal/worker/src/tests/worker_tests.rs +++ b/narwhal/worker/src/tests/worker_tests.rs @@ -6,24 +6,17 @@ use crate::LocalNarwhalClient; use crate::{metrics::initialise_metrics, TrivialTransactionValidator}; use async_trait::async_trait; use bytes::Bytes; -use fastcrypto::{ - encoding::{Encoding, Hex}, - hash::Hash, -}; +use fastcrypto::hash::Hash; use futures::stream::FuturesOrdered; use futures::StreamExt; -use primary::consensus::{ConsensusRound, LeaderSchedule, LeaderSwapTable}; -use primary::{Primary, CHANNEL_CAPACITY, NUM_SHUTDOWN_RECEIVERS}; +use primary::{CHANNEL_CAPACITY, NUM_SHUTDOWN_RECEIVERS}; use prometheus::Registry; -use std::time::Duration; -use storage::NodeStorage; use store::rocks; use store::rocks::MetricConf; use store::rocks::ReadWriteOptions; use test_utils::{ batch, latest_protocol_version, temp_dir, test_network, transaction, CommitteeFixture, }; -use tokio::sync::watch; use types::{ BatchAPI, MockWorkerToPrimary, MockWorkerToWorker, PreSubscribedBroadcastSender, TransactionProto, TransactionsClient, WorkerBatchMessage, WorkerToWorkerClient, @@ -376,288 +369,3 @@ async fn handle_local_clients_transactions() { // Ensure sending ended. assert!(join_handle.await.is_ok()); } - -#[tokio::test] -async fn get_network_peers_from_admin_server() { - // telemetry_subscribers::init_for_testing(); - let primary_1_parameters = Parameters { - batch_size: 200, // Two transactions. - ..Parameters::default() - }; - let fixture = CommitteeFixture::builder().randomize_ports(true).build(); - let committee = fixture.committee(); - let worker_cache = fixture.worker_cache(); - let authority_1 = fixture.authorities().next().unwrap(); - let signer_1 = authority_1.keypair().copy(); - let client_1 = NetworkClient::new_from_keypair(&authority_1.network_keypair()); - - let worker_id = 0; - let worker_1_keypair = authority_1.worker(worker_id).keypair().copy(); - - // Make the data store. - let store = NodeStorage::reopen(temp_dir(), None); - - let (tx_new_certificates, _rx_new_certificates) = - test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); - let (tx_feedback, rx_feedback) = test_utils::test_channel!(CHANNEL_CAPACITY); - let (_tx_consensus_round_updates, rx_consensus_round_updates) = - watch::channel(ConsensusRound::default()); - let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - // Spawn Primary 1 - Primary::spawn( - authority_1.authority().clone(), - signer_1, - authority_1.network_keypair().copy(), - committee.clone(), - worker_cache.clone(), - latest_protocol_version(), - primary_1_parameters.clone(), - client_1.clone(), - store.certificate_store.clone(), - store.proposer_store.clone(), - store.payload_store.clone(), - store.vote_digest_store.clone(), - tx_new_certificates, - rx_feedback, - rx_consensus_round_updates, - &mut tx_shutdown, - tx_feedback, - &Registry::new(), - LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()), - ); - - // Wait for tasks to start - tokio::time::sleep(Duration::from_secs(1)).await; - - let registry_1 = Registry::new(); - let metrics_1 = initialise_metrics(®istry_1); - let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - let worker_1_parameters = Parameters { - batch_size: 200, // Two transactions. - ..Parameters::default() - }; - - // Spawn a `Worker` instance for primary 1. - Worker::spawn( - authority_1.authority().clone(), - worker_1_keypair.copy(), - worker_id, - committee.clone(), - worker_cache.clone(), - latest_protocol_version(), - worker_1_parameters.clone(), - TrivialTransactionValidator, - client_1.clone(), - store.batch_store.clone(), - metrics_1.clone(), - &mut tx_shutdown, - ); - - let primary_1_peer_id = Hex::encode(authority_1.network_keypair().copy().public().0.as_bytes()); - let worker_1_peer_id = Hex::encode(worker_1_keypair.copy().public().0.as_bytes()); - - // Wait for tasks to start - tokio::time::sleep(Duration::from_secs(1)).await; - - // Test getting all known peers for worker 1 - let resp = reqwest::get(format!( - "http://127.0.0.1:{}/known_peers", - worker_1_parameters - .network_admin_server - .worker_network_admin_server_base_port - + worker_id as u16 - )) - .await - .unwrap() - .json::>() - .await - .unwrap(); - - // Assert we returned 3 peers (1 primary + 3 other workers) - assert_eq!(4, resp.len()); - - // Test getting all connected peers for worker 1 (worker at index 0 for primary 1) - let resp = reqwest::get(format!( - "http://127.0.0.1:{}/peers", - worker_1_parameters - .network_admin_server - .worker_network_admin_server_base_port - + worker_id as u16 - )) - .await - .unwrap() - .json::>() - .await - .unwrap(); - - // Assert we returned 1 peer (only worker's primary spawned) - assert_eq!(1, resp.len()); - - // Assert peer ids are correct - let expected_peer_ids = [&primary_1_peer_id]; - assert!(expected_peer_ids.iter().all(|e| resp.contains(e))); - - let authority_2 = fixture.authorities().nth(1).unwrap(); - let signer_2 = authority_2.keypair().copy(); - let client_2 = NetworkClient::new_from_keypair(&authority_2.network_keypair()); - - let worker_2_keypair = authority_2.worker(worker_id).keypair().copy(); - - let primary_2_parameters = Parameters { - batch_size: 200, // Two transactions. - ..Parameters::default() - }; - - let (tx_new_certificates_2, _rx_new_certificates_2) = - test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); - let (tx_feedback_2, rx_feedback_2) = test_utils::test_channel!(CHANNEL_CAPACITY); - let (_tx_consensus_round_updates, rx_consensus_round_updates) = - watch::channel(ConsensusRound::default()); - - let mut tx_shutdown_2 = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - // Spawn Primary 2 - Primary::spawn( - authority_2.authority().clone(), - signer_2, - authority_2.network_keypair().copy(), - committee.clone(), - worker_cache.clone(), - latest_protocol_version(), - primary_2_parameters.clone(), - client_2.clone(), - store.certificate_store.clone(), - store.proposer_store.clone(), - store.payload_store.clone(), - store.vote_digest_store.clone(), - tx_new_certificates_2, - rx_feedback_2, - rx_consensus_round_updates, - &mut tx_shutdown_2, - tx_feedback_2, - &Registry::new(), - LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()), - ); - - // Wait for tasks to start - tokio::time::sleep(Duration::from_secs(1)).await; - - let registry_2 = Registry::new(); - let metrics_2 = initialise_metrics(®istry_2); - - let worker_2_parameters = Parameters { - batch_size: 200, // Two transactions. - ..Parameters::default() - }; - - let mut tx_shutdown_worker = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - // Spawn a `Worker` instance for primary 2. - Worker::spawn( - authority_2.authority().clone(), - worker_2_keypair.copy(), - worker_id, - committee.clone(), - worker_cache.clone(), - latest_protocol_version(), - worker_2_parameters.clone(), - TrivialTransactionValidator, - client_2, - store.batch_store, - metrics_2.clone(), - &mut tx_shutdown_worker, - ); - - // Wait for tasks to start. Sleeping longer here to ensure all primaries and workers - // have a chance to connect to each other. - tokio::time::sleep(Duration::from_secs(5)).await; - - let primary_2_peer_id = Hex::encode(authority_2.network_keypair().copy().public().0.as_bytes()); - let worker_2_peer_id = Hex::encode(worker_2_keypair.copy().public().0.as_bytes()); - - // Test getting all known peers for worker 2 (worker at index 0 for primary 2) - let resp = reqwest::get(format!( - "http://127.0.0.1:{}/known_peers", - worker_2_parameters - .network_admin_server - .worker_network_admin_server_base_port - + worker_id as u16 - )) - .await - .unwrap() - .json::>() - .await - .unwrap(); - - // Assert we returned 4 peers (1 primary + 3 other workers) - assert_eq!(4, resp.len()); - - // Test getting all connected peers for worker 1 (worker at index 0 for primary 1) - let resp = reqwest::get(format!( - "http://127.0.0.1:{}/peers", - worker_1_parameters - .network_admin_server - .worker_network_admin_server_base_port - + worker_id as u16 - )) - .await - .unwrap() - .json::>() - .await - .unwrap(); - - // Assert we returned 3 peers (2 primaries spawned + 1 other worker spawned) - assert_eq!(3, resp.len()); - - // Assert peer ids are correct - let expected_peer_ids = [&primary_1_peer_id, &primary_2_peer_id, &worker_2_peer_id]; - assert!(expected_peer_ids.iter().all(|e| resp.contains(e))); - - // Test getting all connected peers for worker 2 (worker at index 0 for primary 2) - let resp = reqwest::get(format!( - "http://127.0.0.1:{}/peers", - worker_2_parameters - .network_admin_server - .worker_network_admin_server_base_port - + worker_id as u16 - )) - .await - .unwrap() - .json::>() - .await - .unwrap(); - - // Assert we returned 3 peers (2 primaries spawned + 1 other worker spawned) - assert_eq!(3, resp.len()); - - // Assert peer ids are correct - let expected_peer_ids = [&primary_1_peer_id, &primary_2_peer_id, &worker_1_peer_id]; - assert!(expected_peer_ids.iter().all(|e| resp.contains(e))); - - // Assert network connectivity metrics are also set as expected - let filters = vec![ - (primary_2_peer_id.as_str(), "our_primary"), - (primary_1_peer_id.as_str(), "other_primary"), - (worker_1_peer_id.as_str(), "other_worker"), - ]; - - for f in filters { - let mut m = HashMap::new(); - m.insert("peer_id", f.0); - m.insert("type", f.1); - - assert_eq!( - 1, - metrics_2 - .clone() - .network_connection_metrics - .unwrap() - .network_peer_connected - .get_metric_with(&m) - .unwrap() - .get() - ); - } -} diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index 9bdb55e8b4e91..63064a1b6c925 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -106,7 +106,6 @@ impl Worker { let channel_metrics: Arc = Arc::new(metrics.channel_metrics.unwrap()); let inbound_network_metrics = Arc::new(metrics.inbound_network_metrics.unwrap()); let outbound_network_metrics = Arc::new(metrics.outbound_network_metrics.unwrap()); - let network_connection_metrics = metrics.network_connection_metrics.unwrap(); let mut shutdown_receivers = tx_shutdown.subscribe_n(NUM_SHUTDOWN_RECEIVERS); @@ -351,13 +350,6 @@ impl Worker { ); } - let (connection_monitor_handle, _) = network::connectivity::ConnectionMonitor::spawn( - network.downgrade(), - network_connection_metrics, - peer_types, - Some(shutdown_receivers.pop().unwrap()), - ); - let network_admin_server_base_port = parameters .network_admin_server .worker_network_admin_server_base_port @@ -402,7 +394,7 @@ impl Worker { .transactions ); - let mut handles = vec![connection_monitor_handle, network_shutdown_handle]; + let mut handles = vec![network_shutdown_handle]; handles.extend(admin_handles); handles.extend(client_flow_handles); handles