Skip to content

Commit

Permalink
[consensus] Migrate sui to Mysticeti connection monitor (#19814)
Browse files Browse the repository at this point in the history
## Description 

Will follow up to ensure that known peers are set in sui via discovery
so that metrics will be updated. Also will be adding
TonicConnectionMonitor for Mysticeti to use.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
arun-koshy authored Oct 16, 2024
1 parent 2224cf3 commit f8e33f5
Show file tree
Hide file tree
Showing 14 changed files with 244 additions and 816 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ mod test_dag_parser;
/// Exported consensus API.
pub use authority_node::ConsensusAuthority;
pub use block::{BlockAPI, Round, TransactionIndex};
/// Exported API for testing.
pub use block::{TestBlock, Transaction, VerifiedBlock};
pub use commit::{CommitDigest, CommitIndex, CommitRef, CommittedSubDag};
pub use commit_consumer::{CommitConsumer, CommitConsumerMonitor};
pub use network::{
connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle, ConnectionStatus},
metrics::{MetricsMakeCallbackHandler, NetworkRouteMetrics, QuinnConnectionMetrics},
};
pub use transaction::{ClientError, TransactionClient, TransactionVerifier, ValidationError};

/// Exported API for testing.
pub use block::{TestBlock, Transaction, VerifiedBlock};
73 changes: 38 additions & 35 deletions consensus/core/src/network/connection_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ use super::metrics::QuinnConnectionMetrics;

const CONNECTION_STAT_COLLECTION_INTERVAL: Duration = Duration::from_secs(60);

pub(crate) struct ConnectionMonitorHandle {
pub struct ConnectionMonitorHandle {
handle: JoinHandle<()>,
stop: Sender<()>,
// TODO: Sui will use this component eventually instead of the NW version
#[allow(unused)]
connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
}

Expand All @@ -30,6 +28,10 @@ impl ConnectionMonitorHandle {
self.stop.send(()).ok();
self.handle.await.ok();
}

pub fn connection_statuses(&self) -> Arc<DashMap<PeerId, ConnectionStatus>> {
self.connection_statuses.clone()
}
}

#[derive(Eq, PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -89,10 +91,10 @@ impl AnemoConnectionMonitor {

// we report first all the known peers as disconnected - so we can see
// their labels in the metrics reporting tool
for (peer_id, hostname) in &self.known_peers {
for (peer_id, peer_label) in &self.known_peers {
self.connection_metrics
.network_peer_connected
.with_label_values(&[&format!("{peer_id}"), hostname])
.with_label_values(&[&format!("{peer_id}"), peer_label])
.set(0)
}

Expand All @@ -114,10 +116,10 @@ impl AnemoConnectionMonitor {
self.connection_metrics.socket_send_buffer_size.set(
network.socket_send_buf_size() as i64
);
for (peer_id, hostname) in &self.known_peers {
for (peer_id, peer_label) in &self.known_peers {
if let Some(connection) = network.peer(*peer_id) {
let stats = connection.connection_stats();
self.update_quinn_metrics_for_peer(&format!("{peer_id}"), hostname, &stats);
self.update_quinn_metrics_for_peer(&format!("{peer_id}"), peer_label, &stats);
}
}
} else {
Expand Down Expand Up @@ -153,17 +155,17 @@ impl AnemoConnectionMonitor {
// Only report peer IDs for known peers to prevent unlimited cardinality.
if self.known_peers.contains_key(&peer_id) {
let peer_id_str = format!("{peer_id}");
let hostname = self.known_peers.get(&peer_id).unwrap();
let peer_label = self.known_peers.get(&peer_id).unwrap();

self.connection_metrics
.network_peer_connected
.with_label_values(&[&peer_id_str, hostname])
.with_label_values(&[&peer_id_str, peer_label])
.set(int_status);

if let PeerEvent::LostPeer(_, reason) = peer_event {
self.connection_metrics
.network_peer_disconnects
.with_label_values(&[&peer_id_str, hostname, &format!("{reason:?}")])
.with_label_values(&[&peer_id_str, peer_label, &format!("{reason:?}")])
.inc();
}
}
Expand All @@ -173,85 +175,85 @@ impl AnemoConnectionMonitor {
fn update_quinn_metrics_for_peer(
&self,
peer_id: &str,
hostname: &str,
peer_label: &str,
stats: &ConnectionStats,
) {
// Update PathStats
self.connection_metrics
.network_peer_rtt
.with_label_values(&[peer_id, hostname])
.with_label_values(&[peer_id, peer_label])
.set(stats.path.rtt.as_millis() as i64);
self.connection_metrics
.network_peer_lost_packets
.with_label_values(&[peer_id, hostname])
.with_label_values(&[peer_id, peer_label])
.set(stats.path.lost_packets as i64);
self.connection_metrics
.network_peer_lost_bytes
.with_label_values(&[peer_id, hostname])
.with_label_values(&[peer_id, peer_label])
.set(stats.path.lost_bytes as i64);
self.connection_metrics
.network_peer_sent_packets
.with_label_values(&[peer_id, hostname])
.with_label_values(&[peer_id, peer_label])
.set(stats.path.sent_packets as i64);
self.connection_metrics
.network_peer_congestion_events
.with_label_values(&[peer_id, hostname])
.with_label_values(&[peer_id, peer_label])
.set(stats.path.congestion_events as i64);
self.connection_metrics
.network_peer_congestion_window
.with_label_values(&[peer_id, hostname])
.with_label_values(&[peer_id, peer_label])
.set(stats.path.cwnd as i64);

// Update FrameStats
self.connection_metrics
.network_peer_max_data
.with_label_values(&[peer_id, hostname, "transmitted"])
.with_label_values(&[peer_id, peer_label, "transmitted"])
.set(stats.frame_tx.max_data as i64);
self.connection_metrics
.network_peer_max_data
.with_label_values(&[peer_id, hostname, "received"])
.with_label_values(&[peer_id, peer_label, "received"])
.set(stats.frame_rx.max_data as i64);
self.connection_metrics
.network_peer_closed_connections
.with_label_values(&[peer_id, hostname, "transmitted"])
.with_label_values(&[peer_id, peer_label, "transmitted"])
.set(stats.frame_tx.connection_close as i64);
self.connection_metrics
.network_peer_closed_connections
.with_label_values(&[peer_id, hostname, "received"])
.with_label_values(&[peer_id, peer_label, "received"])
.set(stats.frame_rx.connection_close as i64);
self.connection_metrics
.network_peer_data_blocked
.with_label_values(&[peer_id, hostname, "transmitted"])
.with_label_values(&[peer_id, peer_label, "transmitted"])
.set(stats.frame_tx.data_blocked as i64);
self.connection_metrics
.network_peer_data_blocked
.with_label_values(&[peer_id, hostname, "received"])
.with_label_values(&[peer_id, peer_label, "received"])
.set(stats.frame_rx.data_blocked as i64);

// Update UDPStats
self.connection_metrics
.network_peer_udp_datagrams
.with_label_values(&[peer_id, hostname, "transmitted"])
.with_label_values(&[peer_id, peer_label, "transmitted"])
.set(stats.udp_tx.datagrams as i64);
self.connection_metrics
.network_peer_udp_datagrams
.with_label_values(&[peer_id, hostname, "received"])
.with_label_values(&[peer_id, peer_label, "received"])
.set(stats.udp_rx.datagrams as i64);
self.connection_metrics
.network_peer_udp_bytes
.with_label_values(&[peer_id, hostname, "transmitted"])
.with_label_values(&[peer_id, peer_label, "transmitted"])
.set(stats.udp_tx.bytes as i64);
self.connection_metrics
.network_peer_udp_bytes
.with_label_values(&[peer_id, hostname, "received"])
.with_label_values(&[peer_id, peer_label, "received"])
.set(stats.udp_rx.bytes as i64);
self.connection_metrics
.network_peer_udp_transmits
.with_label_values(&[peer_id, hostname, "transmitted"])
.with_label_values(&[peer_id, peer_label, "transmitted"])
.set(stats.udp_tx.ios as i64);
self.connection_metrics
.network_peer_udp_transmits
.with_label_values(&[peer_id, hostname, "received"])
.with_label_values(&[peer_id, peer_label, "received"])
.set(stats.udp_rx.ios as i64);
}
}
Expand All @@ -276,7 +278,7 @@ mod tests {
let network_3 = build_network().unwrap();

let registry = Registry::new();
let metrics = Arc::new(QuinnConnectionMetrics::new(&registry));
let metrics = Arc::new(QuinnConnectionMetrics::new("consensus", &registry));

// AND we connect to peer 2
let peer_2 = network_1.connect(network_2.local_addr()).await.unwrap();
Expand All @@ -288,6 +290,7 @@ mod tests {
// WHEN bring up the monitor
let handle =
AnemoConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), known_peers);
let connection_statuses = handle.connection_statuses();

// THEN peer 2 should be already connected
assert_network_peers(&metrics, 1).await;
Expand All @@ -296,7 +299,7 @@ mod tests {
let mut labels = HashMap::new();
let peer_2_str = format!("{peer_2}");
labels.insert("peer_id", peer_2_str.as_str());
labels.insert("hostname", "peer_2");
labels.insert("peer_label", "peer_2");
assert_ne!(
metrics
.network_peer_rtt
Expand All @@ -306,7 +309,7 @@ mod tests {
0
);
assert_eq!(
*handle.connection_statuses.get(&peer_2).unwrap().value(),
*connection_statuses.get(&peer_2).unwrap().value(),
ConnectionStatus::Connected
);

Expand All @@ -316,7 +319,7 @@ mod tests {
// THEN
assert_network_peers(&metrics, 2).await;
assert_eq!(
*handle.connection_statuses.get(&peer_3).unwrap().value(),
*connection_statuses.get(&peer_3).unwrap().value(),
ConnectionStatus::Connected
);

Expand All @@ -326,7 +329,7 @@ mod tests {
// THEN
assert_network_peers(&metrics, 1).await;
assert_eq!(
*handle.connection_statuses.get(&peer_2).unwrap().value(),
*connection_statuses.get(&peer_2).unwrap().value(),
ConnectionStatus::Disconnected
);

Expand All @@ -336,7 +339,7 @@ mod tests {
// THEN
assert_network_peers(&metrics, 0).await;
assert_eq!(
*handle.connection_statuses.get(&peer_3).unwrap().value(),
*connection_statuses.get(&peer_3).unwrap().value(),
ConnectionStatus::Disconnected
);
}
Expand Down
Loading

0 comments on commit f8e33f5

Please sign in to comment.