Skip to content

Commit

Permalink
Merge pull request #2641 from dusk-network/upgrade-kadcast-rc10
Browse files Browse the repository at this point in the history
node: improve send_to_alive_peers
  • Loading branch information
herr-seppia authored Oct 10, 2024
2 parents f98e3f8 + 7799885 commit 531a467
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ dusk-poseidon = "=0.40.0"
jubjub-schnorr = { version = "=0.5.0", default-features = false }

# we leave kadcast open until a stable release is out
kadcast = "0.7.0-rc.8"
kadcast = "0.7.0-rc.10"
phoenix-circuits = { version = "=0.4.0", default-features = false }
phoenix-core = { version = "=0.32.0", default-features = false }
# we leave piecrust open until a stable release is out
Expand Down
44 changes: 36 additions & 8 deletions node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use node_data::message::payload::{GetResource, Inv, Nonce};
use node_data::message::{AsyncQueue, Metadata, PROTOCOL_VERSION};
use node_data::{get_current_timestamp, Serializable};
use tokio::sync::RwLock;
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, trace, warn};

/// Number of alive peers randomly selected which a `flood_request` is sent to
const REDUNDANCY_PEER_COUNT: usize = 8;
Expand Down Expand Up @@ -169,9 +169,16 @@ impl<const N: usize> Kadcast<N> {
&self.conf
}

async fn send_with_metrics(&self, bytes: &Vec<u8>, recv_addr: SocketAddr) {
counter!("dusk_bytes_sent").increment(bytes.len() as u64);
self.peer.send(bytes, recv_addr).await;
async fn send_with_metrics(
&self,
bytes: &Vec<u8>,
recv_addr: Vec<SocketAddr>,
) {
if !recv_addr.is_empty() {
let bytes_sent = bytes.len() * recv_addr.len();
counter!("dusk_bytes_sent").increment(bytes_sent as u64);
self.peer.send_to_peers(bytes, recv_addr).await;
}
}
}

Expand Down Expand Up @@ -258,7 +265,7 @@ impl<const N: usize> crate::Network for Kadcast<N> {
let topic = msg.topic();

info!("sending msg ({topic:?}) to peer {recv_addr}");
self.send_with_metrics(&encoded, recv_addr).await;
self.send_with_metrics(&encoded, vec![recv_addr]).await;

Ok(())
}
Expand All @@ -281,10 +288,31 @@ impl<const N: usize> crate::Network for Kadcast<N> {

counter!(format!("dusk_requests_{:?}", topic)).increment(1);

for recv_addr in self.peer.alive_nodes(amount).await {
trace!("sending msg ({topic:?}) to peer {recv_addr}");
self.send_with_metrics(&encoded, recv_addr).await;
let mut alive_nodes = self.peer.alive_nodes(amount).await;

if alive_nodes.len() < amount {
let current = alive_nodes.len();

let route_table = self.peer.to_route_table().await;
let new_nodes: Vec<_> = route_table
.into_values()
.flatten()
.map(|(s, _)| s)
.filter(|s| !alive_nodes.contains(s))
.take(amount - current)
.collect();

alive_nodes.extend(new_nodes);
warn!(
event = "Not enought alive peers to send msg, increased",
?topic,
requested = amount,
current,
increased = alive_nodes.len(),
);
}
trace!("sending msg ({topic:?}) to peers {alive_nodes:?}");
self.send_with_metrics(&encoded, alive_nodes).await;

Ok(())
}
Expand Down

0 comments on commit 531a467

Please sign in to comment.