From 9f9686415bb4de85df845a1ffc0925f5fd617ec9 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 9 Apr 2024 16:20:21 +0200 Subject: [PATCH] Fix 'reserved' field compatibility Resolves #137 --- CHANGELOG.md | 5 +++++ src/encoding/header.rs | 17 +++++++++++++---- src/handling.rs | 14 +++++++++----- src/lib.rs | 4 ++-- src/maintainer.rs | 10 +++++++--- src/peer.rs | 2 +- src/transport/encoding/raptorq/encoder.rs | 2 +- 7 files changed, 38 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c324ddb..f053b0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix 'reserved' field compatibility [#137] + ## [0.6.1] - 2024-04-10 ### Added @@ -140,6 +144,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#123]: https://github.com/dusk-network/kadcast/issues/123 [#135]: https://github.com/dusk-network/kadcast/issues/135 [#136]: https://github.com/dusk-network/kadcast/issues/136 +[#137]: https://github.com/dusk-network/kadcast/issues/137 [#138]: https://github.com/dusk-network/kadcast/issues/138 diff --git a/src/encoding/header.rs b/src/encoding/header.rs index 9708d28..bcc6872 100644 --- a/src/encoding/header.rs +++ b/src/encoding/header.rs @@ -9,12 +9,12 @@ use std::io::{self, Error, ErrorKind, Read, Write}; use super::Marshallable; use crate::{kbucket::BinaryID, K_ID_LEN_BYTES, K_NONCE_LEN}; -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Clone)] pub struct Header { pub(crate) binary_id: BinaryID, pub(crate) sender_port: u16, pub(crate) network_id: u8, - pub(crate) reserved: [u8; 2], + pub(crate) reserved: Vec, } impl Header { @@ -32,6 +32,7 @@ impl Marshallable for Header { writer.write_all(self.binary_id.nonce())?; writer.write_all(&self.sender_port.to_le_bytes())?; writer.write_all(&[self.network_id])?; + writer.write_all(&(self.reserved.len() as u16).to_le_bytes())?; writer.write_all(&self.reserved)?; Ok(()) } @@ -55,8 +56,16 @@ impl Marshallable for Header { reader.read_exact(&mut network_id)?; let network_id = network_id[0]; - let mut reserved = [0; 2]; - reader.read_exact(&mut reserved)?; + let mut reserved_len = [0; 2]; + reader.read_exact(&mut reserved_len)?; + let reserved_len = u16::from_le_bytes(reserved_len); + let reserved = if reserved_len > 0 { + let mut reserved = vec![0u8; reserved_len as usize]; + reader.read_exact(&mut reserved)?; + reserved + } else { + vec![] + }; Ok(Header { binary_id, diff --git a/src/handling.rs b/src/handling.rs index 66a5162..6a27886 100644 --- a/src/handling.rs +++ b/src/handling.rs @@ -46,6 +46,10 @@ pub(crate) struct MessageHandler { } impl MessageHandler { + fn my_header(&self) -> Header { + self.my_header.clone() + } + async fn new( ktable: RwLock>, outbound_sender: Sender, @@ -142,7 +146,7 @@ impl MessageHandler { if let Some(pending) = result.pending_eviction() { self.outbound_sender .send(( - Message::Ping(self.my_header), + Message::Ping(self.my_header()), vec![*pending.value().address()], )) .await @@ -173,7 +177,7 @@ impl MessageHandler { async fn handle_ping(&self, remote_node_addr: SocketAddr) { self.outbound_sender - .send((Message::Pong(self.my_header), vec![remote_node_addr])) + .send((Message::Pong(self.my_header()), vec![remote_node_addr])) .await .unwrap_or_else(|e| error!("Unable to send Pong {e}")); } @@ -190,7 +194,7 @@ impl MessageHandler { .closest_peers::(target) .map(|p| p.as_peer_info()) .collect(); - let message = Message::Nodes(self.my_header, NodePayload { peers }); + let message = Message::Nodes(self.my_header(), NodePayload { peers }); self.outbound_sender .send((message, vec![remote_node_addr])) .await @@ -221,7 +225,7 @@ impl MessageHandler { }) .map(|n| { ( - (self.nodes_reply_fn)(self.my_header, n.id), + (self.nodes_reply_fn)(self.my_header(), n.id), vec![n.to_socket_address()], ) }) @@ -271,7 +275,7 @@ impl MessageHandler { height, gossip_frame, }; - let msg = Message::Broadcast(self.my_header, payload); + let msg = Message::Broadcast(self.my_header(), payload); let targets = nodes.map(|node| *node.value().address()).collect(); (msg, targets) diff --git a/src/lib.rs b/src/lib.rs index 9f0755a..663f7ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -205,7 +205,7 @@ impl Peer { .extract(height) .map(|(height, nodes)| { let msg = Message::Broadcast( - self.header, + self.header.clone(), BroadcastPayload { height, gossip_frame: message.to_vec(), //FIX_ME: avoid clone @@ -241,7 +241,7 @@ impl Peer { // We use the Broadcast message type while setting height to 0 // to prevent further propagation at the receiver let msg = Message::Broadcast( - self.header, + self.header.clone(), BroadcastPayload { height: 0, gossip_frame: message.to_vec(), //FIX_ME: avoid clone diff --git a/src/maintainer.rs b/src/maintainer.rs index 8347e26..c9fc48b 100644 --- a/src/maintainer.rs +++ b/src/maintainer.rs @@ -74,7 +74,8 @@ impl TableMaintainer { info!("TableMaintainer::contact_bootstrappers"); let bootstrapping_nodes_addr = self.bootstrapping_nodes_addr(); let binary_key = self.header.binary_id().as_binary(); - let find_nodes = Message::FindNodes(self.header, *binary_key); + let find_nodes = + Message::FindNodes(self.header.clone(), *binary_key); self.send((find_nodes, bootstrapping_nodes_addr)).await; tokio::time::sleep(Duration::from_secs(30)).await; } @@ -119,7 +120,7 @@ impl TableMaintainer { .idle_nodes() .map(|n| *n.value().address()) .collect(); - self.send((Message::Ping(self.header), idles)).await; + self.send((Message::Ping(self.header.clone()), idles)).await; self.ktable.write().await.remove_idle_nodes(); } @@ -133,7 +134,10 @@ impl TableMaintainer { .flat_map(|(_, idle_nodes)| idle_nodes) .map(|target| { ( - Message::FindNodes(self.header, *target.id().as_binary()), + Message::FindNodes( + self.header.clone(), + *target.id().as_binary(), + ), //TODO: Extract alpha nodes vec![*target.value().address()], ) diff --git a/src/peer.rs b/src/peer.rs index 21d3548..f67f0df 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -70,7 +70,7 @@ impl PeerNode { Header { binary_id: *self.id(), sender_port: self.value().address.port(), - reserved: [0; 2], + reserved: vec![], network_id: self.network_id, } } diff --git a/src/transport/encoding/raptorq/encoder.rs b/src/transport/encoding/raptorq/encoder.rs index cd420d0..4c5689a 100644 --- a/src/transport/encoding/raptorq/encoder.rs +++ b/src/transport/encoding/raptorq/encoder.rs @@ -75,7 +75,7 @@ impl Encoder for RaptorQEncoder { let mut packet_with_uid = base_packet.clone(); packet_with_uid.append(&mut encoded_packet.serialize()); Message::Broadcast( - header, + header.clone(), BroadcastPayload { height: payload.height, gossip_frame: packet_with_uid,