From 9d641c3b6183a255d1f9cbd94240afff3a989fa1 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 16 Jun 2022 17:12:48 +0200 Subject: [PATCH] Send nodes response to REGTOPIC --- src/advertisement/test.rs | 5 +--- src/handler/mod.rs | 14 ++++------ src/service.rs | 57 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/src/advertisement/test.rs b/src/advertisement/test.rs index 5efaf8a98..1e4eb10a6 100644 --- a/src/advertisement/test.rs +++ b/src/advertisement/test.rs @@ -99,10 +99,7 @@ async fn ticket_wait_time_duration() { // Add an add for topic ads.insert(enr, topic).unwrap(); - assert_gt!( - ads.ticket_wait_time(topic), - Some(Duration::from_secs(2)) - ); + assert_gt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(2))); assert_lt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(3))); } diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 2a533a907..0381b64ca 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -39,7 +39,6 @@ use crate::{ }; use enr::{CombinedKey, NodeId}; use futures::prelude::*; -use more_asserts::debug_unreachable; use parking_lot::RwLock; use std::{ collections::HashMap, @@ -694,7 +693,10 @@ impl Handler { topic, enr: _, ticket: _, - } => HandlerOut::EstablishedTopic(enr, connection_direction, topic), + } + | RequestBody::TopicQuery { topic } => { + HandlerOut::EstablishedTopic(enr, connection_direction, topic) + } _ => HandlerOut::Established(enr, connection_direction), }; self.service_send @@ -1006,16 +1008,10 @@ impl Handler { if let Some(remaining_responses) = request_call.remaining_responses.as_mut() { *remaining_responses -= 1; let reinsert = match request_call.request.body { - RequestBody::FindNode { .. } | RequestBody::TopicQuery { .. } => { - remaining_responses > &mut 0 - } // The request is reinserted for either another nodes response, a ticket or a // register confirmation response that may come, otherwise the request times out. RequestBody::RegisterTopic { .. } => remaining_responses >= &mut 0, - _ => { - debug_unreachable!("Only FINDNODE, TOPICQUERY and REGISTERTOPIC expect nodes response"); - false - } + _ => remaining_responses > &mut 0, }; if reinsert { // more responses remaining, add back the request and send the response diff --git a/src/service.rs b/src/service.rs index 1f33d54e4..ec724c814 100644 --- a/src/service.rs +++ b/src/service.rs @@ -978,6 +978,19 @@ impl Service { if enr.node_id() == node_address.node_id && enr.udp4_socket().map(SocketAddr::V4) == Some(node_address.socket_addr) { + let local_key: kbucket::Key = self.local_enr.read().node_id().into(); + let topic_key: kbucket::Key = NodeId::new(&topic.as_bytes()).into(); + let distance_to_topic = local_key.log2_distance(&topic_key); + + let mut closest_peers: Vec = Vec::new(); + if let Some(distance) = distance_to_topic { + self.kbuckets + .write() + .nodes_by_distances(&[distance], self.config.max_nodes_response) + .iter() + .for_each(|entry| closest_peers.push(entry.node.value.clone())); + } + let wait_time = self .ads .ticket_wait_time(topic) @@ -993,12 +1006,14 @@ impl Service { // According to spec, a ticket should always be issued upon receiving a REGTOPIC request. self.send_ticket_response( - node_address, + node_address.clone(), id.clone(), new_ticket.clone(), wait_time, ); + self.send_nodes_response(closest_peers, node_address, id.clone(), "REGTOPIC"); + // If the wait time has expired, the TICKET is added to the matching ticket pool. If this is // the first REGTOPIC request from a given node for a given topic, the newly created ticket // is used to add the registration attempt to to the ticket pool. @@ -1243,6 +1258,40 @@ impl Service { }); } } + RequestBody::RegisterTopic { + topic, + enr: _, + ticket: _, + } => { + if let Some(kbuckets) = self.topics_kbuckets.get_mut(&topic) { + for enr in nodes { + let peer_key: kbucket::Key = enr.node_id().into(); + match kbuckets.insert_or_update( + &peer_key, + enr, + NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Incoming, + }, + ) { + InsertResult::Failed(FailureReason::BucketFull) => { + error!("Table full") + } + InsertResult::Failed(FailureReason::BucketFilter) => { + error!("Failed bucket filter") + } + InsertResult::Failed(FailureReason::TableFilter) => { + error!("Failed table filter") + } + InsertResult::Failed(FailureReason::InvalidSelfUpdate) => { + error!("Invalid self update") + } + InsertResult::Failed(_) => error!("Failed to insert ENR"), + _ => {} + } + } + } + } RequestBody::FindNode { .. } => { self.discovered(&node_id, nodes, active_request.query_id) } @@ -1645,7 +1694,7 @@ impl Service { nodes_to_send: Vec, node_address: NodeAddress, rpc_id: RequestId, - query: &str, + req_type: &str, ) { // if there are no nodes, send an empty response if nodes_to_send.is_empty() { @@ -1658,7 +1707,7 @@ impl Service { }; trace!( "Sending empty {} response to: {}", - query, + req_type, node_address.node_id ); if let Err(e) = self @@ -1718,7 +1767,7 @@ impl Service { for response in responses { trace!( "Sending {} response to: {}. Response: {} ", - query, + req_type, node_address, response );