diff --git a/src/config.rs b/src/config.rs index f8c58a0ee..e6b6aa782 100644 --- a/src/config.rs +++ b/src/config.rs @@ -102,7 +102,7 @@ pub struct Discv5Config { pub topic_radius: u64, pub topic_query_timeout: Duration, - pub topics_num_results: usize, + pub topic_query_num_results: usize, /// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with /// timing support. By default, the executor that created the discv5 struct will be used. @@ -146,7 +146,7 @@ impl Default for Discv5Config { ban_duration: Some(Duration::from_secs(3600)), // 1 hour topic_radius: 256, topic_query_timeout: Duration::from_secs(60), - topics_num_results: 16, + topic_query_num_results: 16, ip_mode: IpMode::default(), executor: None, } diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 0381b64ca..410e54be9 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1011,6 +1011,10 @@ impl Handler { // The request is reinserted for either another nodes response, a ticket or a // register confirmation response that may come, otherwise the request times out. RequestBody::RegisterTopic { .. } => remaining_responses >= &mut 0, + RequestBody::TopicQuery { .. } => { + // remove from some map of NODES and AD NODES + remaining_responses >= &mut 0 + } _ => remaining_responses > &mut 0, }; if reinsert { diff --git a/src/service.rs b/src/service.rs index ec724c814..4f030d2ab 100644 --- a/src/service.rs +++ b/src/service.rs @@ -436,7 +436,7 @@ impl Service { ticket_pools: TicketPools::default(), active_topic_queries: ActiveTopicQueries::new( config.topic_query_timeout, - config.topics_num_results, + config.topic_query_num_results, ), exit, config: config.clone(), @@ -666,12 +666,6 @@ impl Service { if callback.send(found_enrs).is_err() { warn!("Callback dropped for query {}. Results dropped", *id); } - } else { - let QueryType::FindNode(node_id) = result.target.query_type; - let topic = TopicHash::from_raw(node_id.raw()); - if self.topics.contains_key(&topic){ - // add to topic kbuckets? - } } } } @@ -978,18 +972,12 @@ impl Service { if enr.node_id() == node_address.node_id && enr.udp4_socket().map(SocketAddr::V4) == Some(node_address.socket_addr) { - let local_key: kbucket::Key = self.local_enr.read().node_id().into(); - let topic_key: kbucket::Key = NodeId::new(&topic.as_bytes()).into(); - let distance_to_topic = local_key.log2_distance(&topic_key); - - let mut closest_peers: Vec = Vec::new(); - if let Some(distance) = distance_to_topic { - self.kbuckets - .write() - .nodes_by_distances(&[distance], self.config.max_nodes_response) - .iter() - .for_each(|entry| closest_peers.push(entry.node.value.clone())); - } + self.send_topic_nodes_response( + topic, + node_address.clone(), + id.clone(), + "REGTOPIC".into(), + ); let wait_time = self .ads @@ -1006,14 +994,12 @@ impl Service { // According to spec, a ticket should always be issued upon receiving a REGTOPIC request. self.send_ticket_response( - node_address.clone(), + node_address, id.clone(), new_ticket.clone(), wait_time, ); - self.send_nodes_response(closest_peers, node_address, id.clone(), "REGTOPIC"); - // If the wait time has expired, the TICKET is added to the matching ticket pool. If this is // the first REGTOPIC request from a given node for a given topic, the newly created ticket // is used to add the registration attempt to to the ticket pool. @@ -1075,6 +1061,12 @@ impl Service { } } RequestBody::TopicQuery { topic } => { + self.send_topic_nodes_response( + topic, + node_address.clone(), + id.clone(), + "REGTOPIC".into(), + ); self.send_topic_query_response(node_address, id, topic); } } @@ -1648,6 +1640,46 @@ impl Service { self.send_nodes_response(nodes_to_send, node_address, rpc_id, "TOPICQUERY"); } + fn send_topic_nodes_response( + &mut self, + topic: TopicHash, + node_address: NodeAddress, + id: RequestId, + req_type: String, + ) { + let local_key: kbucket::Key = self.local_enr.read().node_id().into(); + let topic_key: kbucket::Key = NodeId::new(&topic.as_bytes()).into(); + let distance_to_topic = local_key.log2_distance(&topic_key); + + let mut closest_peers: Vec = Vec::new(); + let closest_peers_length = closest_peers.len(); + if let Some(distance) = distance_to_topic { + self.kbuckets + .write() + .nodes_by_distances(&[distance], self.config.max_nodes_response) + .iter() + .for_each(|entry| closest_peers.push(entry.node.value.clone())); + + if closest_peers_length < self.config.max_nodes_response { + for entry in self + .kbuckets + .write() + .nodes_by_distances( + &[distance - 1, distance + 1], + self.config.max_nodes_response - closest_peers_length, + ) + .iter() + { + if closest_peers_length > self.config.max_nodes_response { + break; + } + closest_peers.push(entry.node.value.clone()); + } + } + } + self.send_nodes_response(closest_peers, node_address, id, &req_type); + } + /// Sends a NODES response, given a list of found ENR's. This function splits the nodes up /// into multiple responses to ensure the response stays below the maximum packet size. fn send_find_nodes_response( diff --git a/src/service/test.rs b/src/service/test.rs index d3e4d5e78..a54efda74 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -102,7 +102,7 @@ async fn build_service( ticket_pools: TicketPools::default(), active_topic_queries: ActiveTopicQueries::new( config.topic_query_timeout, - config.topics_num_results, + config.topic_query_num_results, ), exit, config,