Skip to content

Commit

Permalink
Send nodes response to REGTOPIC
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Jun 16, 2022
1 parent b87ad5c commit 9d641c3
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 17 deletions.
5 changes: 1 addition & 4 deletions src/advertisement/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ async fn ticket_wait_time_duration() {
// Add an add for topic
ads.insert(enr, topic).unwrap();

assert_gt!(
ads.ticket_wait_time(topic),
Some(Duration::from_secs(2))
);
assert_gt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(2)));
assert_lt!(ads.ticket_wait_time(topic), Some(Duration::from_secs(3)));
}

Expand Down
14 changes: 5 additions & 9 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::{
};
use enr::{CombinedKey, NodeId};
use futures::prelude::*;
use more_asserts::debug_unreachable;
use parking_lot::RwLock;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -694,7 +693,10 @@ impl Handler {
topic,
enr: _,
ticket: _,
} => HandlerOut::EstablishedTopic(enr, connection_direction, topic),
}
| RequestBody::TopicQuery { topic } => {
HandlerOut::EstablishedTopic(enr, connection_direction, topic)
}
_ => HandlerOut::Established(enr, connection_direction),
};
self.service_send
Expand Down Expand Up @@ -1006,16 +1008,10 @@ impl Handler {
if let Some(remaining_responses) = request_call.remaining_responses.as_mut() {
*remaining_responses -= 1;
let reinsert = match request_call.request.body {
RequestBody::FindNode { .. } | RequestBody::TopicQuery { .. } => {
remaining_responses > &mut 0
}
// The request is reinserted for either another nodes response, a ticket or a
// register confirmation response that may come, otherwise the request times out.
RequestBody::RegisterTopic { .. } => remaining_responses >= &mut 0,
_ => {
debug_unreachable!("Only FINDNODE, TOPICQUERY and REGISTERTOPIC expect nodes response");
false
}
_ => remaining_responses > &mut 0,
};
if reinsert {
// more responses remaining, add back the request and send the response
Expand Down
57 changes: 53 additions & 4 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,19 @@ impl Service {
if enr.node_id() == node_address.node_id
&& enr.udp4_socket().map(SocketAddr::V4) == Some(node_address.socket_addr)
{
let local_key: kbucket::Key<NodeId> = self.local_enr.read().node_id().into();
let topic_key: kbucket::Key<NodeId> = NodeId::new(&topic.as_bytes()).into();
let distance_to_topic = local_key.log2_distance(&topic_key);

let mut closest_peers: Vec<Enr> = Vec::new();
if let Some(distance) = distance_to_topic {
self.kbuckets
.write()
.nodes_by_distances(&[distance], self.config.max_nodes_response)
.iter()
.for_each(|entry| closest_peers.push(entry.node.value.clone()));
}

let wait_time = self
.ads
.ticket_wait_time(topic)
Expand All @@ -993,12 +1006,14 @@ impl Service {

// According to spec, a ticket should always be issued upon receiving a REGTOPIC request.
self.send_ticket_response(
node_address,
node_address.clone(),
id.clone(),
new_ticket.clone(),
wait_time,
);

self.send_nodes_response(closest_peers, node_address, id.clone(), "REGTOPIC");

// If the wait time has expired, the TICKET is added to the matching ticket pool. If this is
// the first REGTOPIC request from a given node for a given topic, the newly created ticket
// is used to add the registration attempt to to the ticket pool.
Expand Down Expand Up @@ -1243,6 +1258,40 @@ impl Service {
});
}
}
RequestBody::RegisterTopic {
topic,
enr: _,
ticket: _,
} => {
if let Some(kbuckets) = self.topics_kbuckets.get_mut(&topic) {
for enr in nodes {
let peer_key: kbucket::Key<NodeId> = enr.node_id().into();
match kbuckets.insert_or_update(
&peer_key,
enr,
NodeStatus {
state: ConnectionState::Disconnected,
direction: ConnectionDirection::Incoming,
},
) {
InsertResult::Failed(FailureReason::BucketFull) => {
error!("Table full")
}
InsertResult::Failed(FailureReason::BucketFilter) => {
error!("Failed bucket filter")
}
InsertResult::Failed(FailureReason::TableFilter) => {
error!("Failed table filter")
}
InsertResult::Failed(FailureReason::InvalidSelfUpdate) => {
error!("Invalid self update")
}
InsertResult::Failed(_) => error!("Failed to insert ENR"),
_ => {}
}
}
}
}
RequestBody::FindNode { .. } => {
self.discovered(&node_id, nodes, active_request.query_id)
}
Expand Down Expand Up @@ -1645,7 +1694,7 @@ impl Service {
nodes_to_send: Vec<Enr>,
node_address: NodeAddress,
rpc_id: RequestId,
query: &str,
req_type: &str,
) {
// if there are no nodes, send an empty response
if nodes_to_send.is_empty() {
Expand All @@ -1658,7 +1707,7 @@ impl Service {
};
trace!(
"Sending empty {} response to: {}",
query,
req_type,
node_address.node_id
);
if let Err(e) = self
Expand Down Expand Up @@ -1718,7 +1767,7 @@ impl Service {
for response in responses {
trace!(
"Sending {} response to: {}. Response: {} ",
query,
req_type,
node_address,
response
);
Expand Down

0 comments on commit 9d641c3

Please sign in to comment.