Skip to content

Commit

Permalink
[chore] additional logging and monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Jun 27, 2024
1 parent 2bb6bc1 commit 627a9d9
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 15 deletions.
2 changes: 2 additions & 0 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ where

network_manager.install_service(network_service).await;

info!("Authority start complete, took {:?}", start_time.elapsed());

Self {
context,
start_time,
Expand Down
14 changes: 12 additions & 2 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::{sync::Arc, time::Duration};

use mysten_metrics::monitored_mpsc::UnboundedSender;
use parking_lot::RwLock;
use tracing::info;
use tokio::time::Instant;
use tracing::{debug, info};

use crate::{
block::{BlockAPI, VerifiedBlock},
Expand Down Expand Up @@ -97,17 +98,19 @@ impl CommitObserver {
}

fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
let now = Instant::now();
// TODO: remove this check, to allow consensus to regenerate commits?
let last_commit = self
.store
.read_last_commit()
.expect("Reading the last commit should not fail");

if let Some(last_commit) = last_commit {
if let Some(last_commit) = &last_commit {
let last_commit_index = last_commit.index();

assert!(last_commit_index >= last_processed_commit_index);
if last_commit_index == last_processed_commit_index {
debug!("Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index");
return;
}
};
Expand All @@ -118,6 +121,8 @@ impl CommitObserver {
.scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
.expect("Scanning commits should not fail");

debug!("Recovering commit observer from {last_processed_commit_index} with last commit {:?} and {} unsent commits", last_commit, unsent_commits.len());

// Resend all the committed subdags to the consensus output channel
// for all the commits above the last processed index.
let mut last_sent_commit_index = last_processed_commit_index;
Expand Down Expand Up @@ -150,6 +155,11 @@ impl CommitObserver {

last_sent_commit_index += 1;
}

debug!(
"Commit observer recovery complete, took {:?}",
now.elapsed()
);
}

fn report_metrics(&self, committed: &[CommittedSubDag]) {
Expand Down
14 changes: 14 additions & 0 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ impl Core {
.unwrap();
}

debug!(
"Core recovery complete with last block {:?}",
self.last_proposed_block
);

self
}

Expand Down Expand Up @@ -406,6 +411,15 @@ impl Core {
.node_metrics
.block_ancestors
.observe(ancestors.len() as f64);
for ancestor in &ancestors {
let authority = &self.context.committee.authority(ancestor.author()).hostname;
self.context
.metrics
.node_metrics
.block_ancestors_depth
.with_label_values(&[authority])
.observe(clock_round.saturating_sub(ancestor.round()).into());
}

// Ensure ancestor timestamps are not more advanced than the current time.
// Also catch the issue if system's clock go backwards.
Expand Down
8 changes: 8 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub(crate) struct NodeMetrics {
pub(crate) proposed_blocks: IntCounterVec,
pub(crate) block_size: Histogram,
pub(crate) block_ancestors: Histogram,
pub(crate) block_ancestors_depth: HistogramVec,
pub(crate) highest_verified_authority_round: IntGaugeVec,
pub(crate) lowest_verified_authority_round: IntGaugeVec,
pub(crate) block_proposal_leader_wait_ms: IntCounterVec,
Expand Down Expand Up @@ -190,6 +191,13 @@ impl NodeMetrics {
exponential_buckets(1.0, 1.4, 20).unwrap(),
registry,
).unwrap(),
block_ancestors_depth: register_histogram_vec_with_registry!(
"block_ancestors_depth",
"The depth in rounds of ancestors included in newly proposed blocks",
&["authority"],
exponential_buckets(1.0, 2.0, 14).unwrap(),
registry,
).unwrap(),
highest_verified_authority_round: register_int_gauge_vec_with_registry!(
"highest_verified_authority_round",
"The highest round of verified block for the corresponding authority",
Expand Down
2 changes: 2 additions & 0 deletions consensus/core/src/network/anemo_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
.with_label_values(&["anemo"])
.set(1);

debug!("Starting anemo service");

let server = ConsensusRpcServer::new(AnemoServiceProxy::new(self.context.clone(), service));
let authority = self.context.committee.authority(self.context.own_index);
// Bind to localhost in unit tests since only local networking is needed.
Expand Down
9 changes: 6 additions & 3 deletions consensus/core/src/network/tonic_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey};
use futures::{stream, Stream, StreamExt as _};
use hyper::server::conn::Http;
use mysten_common::sync::notify_once::NotifyOnce;
use mysten_metrics::monitored_future;
use mysten_network::{
callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler},
multiaddr::Protocol,
Expand Down Expand Up @@ -654,6 +655,8 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {
.with_label_values(&["tonic"])
.set(1);

debug!("Starting tonic service");

let authority = self.context.committee.authority(self.context.own_index);
// Bind to localhost in unit tests since only local networking is needed.
// Bind to the unspecified address to allow the actual address to be assigned,
Expand Down Expand Up @@ -699,7 +702,7 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {
let tls_acceptor = TlsAcceptor::from(Arc::new(tls_server_config));

// Create listener to incoming connections.
let deadline = Instant::now() + Duration::from_secs(30);
let deadline = Instant::now() + Duration::from_secs(20);
let listener = loop {
if Instant::now() > deadline {
panic!("Failed to start server: timeout");
Expand Down Expand Up @@ -764,7 +767,7 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {

let shutdown_notif = self.shutdown_notif.clone();

self.server.spawn(async move {
self.server.spawn(monitored_future!(async move {
let mut connection_handlers = JoinSet::new();

loop {
Expand Down Expand Up @@ -895,7 +898,7 @@ impl<S: NetworkService> NetworkManager<S> for TonicManager {
Ok(())
});
}
});
}));

info!("Server started at: {own_address}");
}
Expand Down
27 changes: 17 additions & 10 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,23 +244,30 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}
let (sender, receiver) =
channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
tasks.spawn(Self::fetch_blocks_from_authority(
let context_cloned = context.clone();
let network_cloned = network_client.clone();
let block_verified_cloned = block_verifier.clone();
let core_thread_dispatcher_cloned = core_dispatcher.clone();
let dag_state_cloned = dag_state.clone();
let command_sender_cloned = commands_sender.clone();

tasks.spawn(monitored_future!(Self::fetch_blocks_from_authority(
index,
network_client.clone(),
block_verifier.clone(),
context.clone(),
core_dispatcher.clone(),
dag_state.clone(),
network_cloned,
block_verified_cloned,
context_cloned,
core_thread_dispatcher_cloned,
dag_state_cloned,
receiver,
commands_sender.clone(),
));
command_sender_cloned,
)));
fetch_block_senders.insert(index, sender);
}

let commands_sender_clone = commands_sender.clone();

// Spawn the task to listen to the requests & periodic runs
tasks.spawn(async {
tasks.spawn(monitored_future!(async move {
let mut s = Self {
context,
commands_receiver,
Expand All @@ -274,7 +281,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
dag_state,
};
s.run().await;
});
}));

Arc::new(SynchronizerHandle {
commands_sender,
Expand Down

0 comments on commit 627a9d9

Please sign in to comment.