diff --git a/src/advertisement/test.rs b/src/advertisement/test.rs index 5efaf8a98..1e4eb10a6 100644 --- a/src/advertisement/test.rs +++ b/src/advertisement/test.rs @@ -99,10 +99,7 @@ async fn ticket_wait_time_duration() { // Add an add for topic ads.insert(enr, topic).unwrap(); - assert_gt!( - ads.ticket_wait_time(topic), - Some(Duration::from_secs(2)) - ); + assert_gt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(2))); assert_lt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(3))); } diff --git a/src/config.rs b/src/config.rs index f8c58a0ee..b3eedc9cb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -102,7 +102,7 @@ pub struct Discv5Config { pub topic_radius: u64, pub topic_query_timeout: Duration, - pub topics_num_results: usize, + pub topic_query_peers: usize, /// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with /// timing support. By default, the executor that created the discv5 struct will be used. @@ -146,7 +146,7 @@ impl Default for Discv5Config { ban_duration: Some(Duration::from_secs(3600)), // 1 hour topic_radius: 256, topic_query_timeout: Duration::from_secs(60), - topics_num_results: 16, + topic_query_peers: 10, ip_mode: IpMode::default(), executor: None, } diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 2a533a907..410e54be9 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -39,7 +39,6 @@ use crate::{ }; use enr::{CombinedKey, NodeId}; use futures::prelude::*; -use more_asserts::debug_unreachable; use parking_lot::RwLock; use std::{ collections::HashMap, @@ -694,7 +693,10 @@ impl Handler { topic, enr: _, ticket: _, - } => HandlerOut::EstablishedTopic(enr, connection_direction, topic), + } + | RequestBody::TopicQuery { topic } => { + HandlerOut::EstablishedTopic(enr, connection_direction, topic) + } _ => HandlerOut::Established(enr, connection_direction), }; self.service_send @@ -1006,16 +1008,14 @@ impl Handler { if let Some(remaining_responses) = request_call.remaining_responses.as_mut() { *remaining_responses -= 1; let reinsert = match request_call.request.body { - RequestBody::FindNode { .. } | RequestBody::TopicQuery { .. } => { - remaining_responses > &mut 0 - } // The request is reinserted for either another nodes response, a ticket or a // register confirmation response that may come, otherwise the request times out. RequestBody::RegisterTopic { .. } => remaining_responses >= &mut 0, - _ => { - debug_unreachable!("Only FINDNODE, TOPICQUERY and REGISTERTOPIC expect nodes response"); - false + RequestBody::TopicQuery { .. } => { + // remove from some map of NODES and AD NODES + remaining_responses >= &mut 0 } + _ => remaining_responses > &mut 0, }; if reinsert { // more responses remaining, add back the request and send the response diff --git a/src/service.rs b/src/service.rs index 1f33d54e4..7d9f771ac 100644 --- a/src/service.rs +++ b/src/service.rs @@ -436,7 +436,7 @@ impl Service { ticket_pools: TicketPools::default(), active_topic_queries: ActiveTopicQueries::new( config.topic_query_timeout, - config.topics_num_results, + config.max_nodes_response, ), exit, config: config.clone(), @@ -666,12 +666,6 @@ impl Service { if callback.send(found_enrs).is_err() { warn!("Callback dropped for query {}. Results dropped", *id); } - } else { - let QueryType::FindNode(node_id) = result.target.query_type; - let topic = TopicHash::from_raw(node_id.raw()); - if self.topics.contains_key(&topic){ - // add to topic kbuckets? - } } } } @@ -978,6 +972,13 @@ impl Service { if enr.node_id() == node_address.node_id && enr.udp4_socket().map(SocketAddr::V4) == Some(node_address.socket_addr) { + self.send_topic_nodes_response( + topic, + node_address.clone(), + id.clone(), + "REGTOPIC".into(), + ); + let wait_time = self .ads .ticket_wait_time(topic) @@ -1060,6 +1061,12 @@ impl Service { } } RequestBody::TopicQuery { topic } => { + self.send_topic_nodes_response( + topic, + node_address.clone(), + id.clone(), + "REGTOPIC".into(), + ); self.send_topic_query_response(node_address, id, topic); } } @@ -1243,6 +1250,40 @@ impl Service { }); } } + RequestBody::RegisterTopic { + topic, + enr: _, + ticket: _, + } => { + if let Some(kbuckets) = self.topics_kbuckets.get_mut(&topic) { + for enr in nodes { + let peer_key: kbucket::Key = enr.node_id().into(); + match kbuckets.insert_or_update( + &peer_key, + enr, + NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Incoming, + }, + ) { + InsertResult::Failed(FailureReason::BucketFull) => { + error!("Table full") + } + InsertResult::Failed(FailureReason::BucketFilter) => { + error!("Failed bucket filter") + } + InsertResult::Failed(FailureReason::TableFilter) => { + error!("Failed table filter") + } + InsertResult::Failed(FailureReason::InvalidSelfUpdate) => { + error!("Invalid self update") + } + InsertResult::Failed(_) => error!("Failed to insert ENR"), + _ => {} + } + } + } + } RequestBody::FindNode { .. } => { self.discovered(&node_id, nodes, active_request.query_id) } @@ -1599,6 +1640,46 @@ impl Service { self.send_nodes_response(nodes_to_send, node_address, rpc_id, "TOPICQUERY"); } + fn send_topic_nodes_response( + &mut self, + topic: TopicHash, + node_address: NodeAddress, + id: RequestId, + req_type: String, + ) { + let local_key: kbucket::Key = self.local_enr.read().node_id().into(); + let topic_key: kbucket::Key = NodeId::new(&topic.as_bytes()).into(); + let distance_to_topic = local_key.log2_distance(&topic_key); + + let mut closest_peers: Vec = Vec::new(); + let closest_peers_length = closest_peers.len(); + if let Some(distance) = distance_to_topic { + self.kbuckets + .write() + .nodes_by_distances(&[distance], self.config.max_nodes_response) + .iter() + .for_each(|entry| closest_peers.push(entry.node.value.clone())); + + if closest_peers_length < self.config.max_nodes_response { + for entry in self + .kbuckets + .write() + .nodes_by_distances( + &[distance - 1, distance + 1], + self.config.max_nodes_response - closest_peers_length, + ) + .iter() + { + if closest_peers_length > self.config.max_nodes_response { + break; + } + closest_peers.push(entry.node.value.clone()); + } + } + } + self.send_nodes_response(closest_peers, node_address, id, &req_type); + } + /// Sends a NODES response, given a list of found ENR's. This function splits the nodes up /// into multiple responses to ensure the response stays below the maximum packet size. fn send_find_nodes_response( @@ -1645,7 +1726,7 @@ impl Service { nodes_to_send: Vec, node_address: NodeAddress, rpc_id: RequestId, - query: &str, + req_type: &str, ) { // if there are no nodes, send an empty response if nodes_to_send.is_empty() { @@ -1658,7 +1739,7 @@ impl Service { }; trace!( "Sending empty {} response to: {}", - query, + req_type, node_address.node_id ); if let Err(e) = self @@ -1718,7 +1799,7 @@ impl Service { for response in responses { trace!( "Sending {} response to: {}. Response: {} ", - query, + req_type, node_address, response ); diff --git a/src/service/test.rs b/src/service/test.rs index d3e4d5e78..d361aa543 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -102,7 +102,7 @@ async fn build_service( ticket_pools: TicketPools::default(), active_topic_queries: ActiveTopicQueries::new( config.topic_query_timeout, - config.topics_num_results, + config.max_nodes_response, ), exit, config,