diff --git a/lib/src/config/config_file/client.example.toml b/lib/src/config/config_file/client.example.toml index 6d95ed75c..b202164a3 100644 --- a/lib/src/config/config_file/client.example.toml +++ b/lib/src/config/config_file/client.example.toml @@ -34,8 +34,7 @@ seed_nodes = [ # This can be used to advertise the public URL and port that peers should connect to, while # `listen_addresses` contains the loopback IP and port that this nodes listens on, which may # not be publicly reachable. -# For validators it is strongly recommended to list a public reachable IPv4 IP. This IPv4 -# address must be the same as the node uses for outbound connections for autonat to work. +# For validators it is a necessity to have a publicly reachable IP or hostname in order to be operative. # Default: [] #advertised_addresses = [ # "/ip4/my.ip/tcp/8443/ws", diff --git a/network-libp2p/src/autonat.rs b/network-libp2p/src/autonat.rs new file mode 100644 index 000000000..646790a5b --- /dev/null +++ b/network-libp2p/src/autonat.rs @@ -0,0 +1,105 @@ +use std::collections::{HashMap, HashSet}; + +use libp2p::Multiaddr; + +/// The NAT status an address can have +#[derive(Copy, Clone, Debug, Default, PartialEq)] +pub(crate) enum NatStatus { + /// The address is publicly reachable + Public, + /// The address is not publicly reachable + Private, + /// The reachability of the address is unknown + #[default] + Unknown, +} + +/// The NAT state of the local peer +#[derive(Default)] +pub(crate) struct NatState { + /// The addresses that are confirmed thus publicly reachable + confirmed_addresses: HashSet, + /// The list of discovered local addresses + address_status: HashMap, + /// The NAT status of the local peer + status: NatStatus, +} + +impl NatState { + /// Adds an address to track its NAT status + pub fn add_address(&mut self, address: Multiaddr) { + self.address_status.insert(address, NatStatus::Unknown); + } + + /// Remove address and no longer track its NAT status + pub fn remove_address(&mut self, address: &Multiaddr) { + self.address_status.remove(address); + self.confirmed_addresses.remove(address); + self.update_state(); + } + + /// Set the NAT status of address + pub fn set_address_nat(&mut self, address: Multiaddr, nat_status: NatStatus) { + let address_status = self + .address_status + .entry(address.clone()) + .or_insert(nat_status); + + if *address_status == NatStatus::Public { + self.confirmed_addresses.insert(address); + } else { + self.confirmed_addresses.remove(&address); + } + self.update_state(); + } + + /// Mark the address as confirmed thus publicly reachable + pub fn add_confirmed_address(&mut self, address: Multiaddr) { + let address_status = self.address_status.entry(address.clone()).or_default(); + *address_status = NatStatus::Public; + + self.confirmed_addresses.insert(address); + self.update_state(); + } + + /// External address expired thus no longer publicly reachable + pub fn remove_confirmed_address(&mut self, address: &Multiaddr) { + let address_status = self.address_status.entry(address.clone()).or_default(); + *address_status = NatStatus::Private; + + self.confirmed_addresses.remove(address); + self.update_state(); + } + + /// Determine the general NAT state of the local peer + fn update_state(&mut self) { + let old_nat_status = self.status; + + if !self.confirmed_addresses.is_empty() { + self.status = NatStatus::Public; + } else if self + .address_status + .iter() + .all(|(_, status)| *status == NatStatus::Private) + { + self.status = NatStatus::Private; + } else { + self.status = NatStatus::Unknown; + } + + if old_nat_status == self.status { + return; + } + + if self.status == NatStatus::Private { + log::warn!("Couldn't detect a public reachable address. Validator network operations won't be possible"); + log::warn!("You may need to find a relay to enable validator network operations"); + } else if self.status == NatStatus::Public { + log::info!( + ?old_nat_status, + new_nat_status = ?self.status, + "NAT status changed and detected public reachable address. Validator network operations will be possible" + ); + } + } +} diff --git a/network-libp2p/src/behaviour.rs b/network-libp2p/src/behaviour.rs index d83a9b736..700db8629 100644 --- a/network-libp2p/src/behaviour.rs +++ b/network-libp2p/src/behaviour.rs @@ -1,13 +1,15 @@ use std::{iter, sync::Arc}; use libp2p::{ - autonat, connection_limits, gossipsub, + autonat::v2::{self as autonat, client::Config as AutonatConfig}, + connection_limits, gossipsub, kad::{self, store::MemoryStore}, ping, request_response, swarm::NetworkBehaviour, Multiaddr, PeerId, StreamProtocol, }; use parking_lot::RwLock; +use rand::rngs::OsRng; use crate::{ connection_pool, @@ -32,9 +34,10 @@ pub struct Behaviour { pub connection_limits: connection_limits::Behaviour, pub pool: connection_pool::Behaviour, pub discovery: discovery::Behaviour, + pub autonat_server: autonat::server::Behaviour, + pub autonat_client: autonat::client::Behaviour, pub dht: kad::Behaviour, pub gossipsub: gossipsub::Behaviour, - pub autonat: autonat::Behaviour, pub ping: ping::Behaviour, pub request_response: request_response::Behaviour, } @@ -96,12 +99,11 @@ impl Behaviour { req_res_config, ); - // Autonat behaviour - let mut autonat_config = autonat::Config::default(); - if config.autonat_allow_non_global_ips { - autonat_config.only_global_ips = false; - } - let autonat = autonat::Behaviour::new(peer_id, autonat_config); + // AutoNAT server behaviour + let autonat_server = autonat::server::Behaviour::new(OsRng); + + // AutoNAT client behaviour + let autonat_client = autonat::client::Behaviour::new(OsRng, AutonatConfig::default()); // Connection limits behaviour let limits = connection_limits::ConnectionLimits::default() @@ -119,7 +121,8 @@ impl Behaviour { ping, pool, request_response, - autonat, + autonat_client, + autonat_server, connection_limits, } } diff --git a/network-libp2p/src/connection_pool/behaviour.rs b/network-libp2p/src/connection_pool/behaviour.rs index 5c8f08a24..d716550fb 100644 --- a/network-libp2p/src/connection_pool/behaviour.rs +++ b/network-libp2p/src/connection_pool/behaviour.rs @@ -649,15 +649,16 @@ impl Behaviour { self.addresses.mark_failed(address.clone()); } - // Ignore connection if another connection to this peer already exists. + // Ignore connection if too many other connections to this peer already exists. + // Multiple connections with a peer are necessary as AutoNAT probes run over them. // TODO Do we still want to subject it to the IP limit checks? - if other_established > 0 { + if other_established > 3 { debug!( %peer_id, connections = other_established, - "Already have connections established to this peer", + "Already have too many connections established to this peer", ); - // We have more than one connection to the same peer. Deterministically + // We have more than three connections to the same peer. Deterministically // choose which connection to close: close the connection only if the // other peer ID is less than our own peer ID value. // Note: We don't track all of the connection IDs and if the latest diff --git a/network-libp2p/src/discovery/handler.rs b/network-libp2p/src/discovery/handler.rs index 55b732045..7a04d03c5 100644 --- a/network-libp2p/src/discovery/handler.rs +++ b/network-libp2p/src/discovery/handler.rs @@ -1,4 +1,5 @@ use std::{ + collections::{HashSet, VecDeque}, pin::Pin, sync::Arc, task::{Context, Poll, Waker}, @@ -14,10 +15,11 @@ use libp2p::{ swarm::{ handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ProtocolSupport, }, ConnectionHandler, ConnectionHandlerEvent, Stream, SubstreamProtocol, }, - Multiaddr, PeerId, + Multiaddr, PeerId, StreamProtocol, }; use nimiq_hash::Blake2bHash; use nimiq_network_interface::peer_info::Services; @@ -34,6 +36,7 @@ use super::{ peer_contacts::{PeerContactBook, SignedPeerContact}, protocol::{ChallengeNonce, DiscoveryMessage, DiscoveryProtocol}, }; +use crate::{AUTONAT_DIAL_BACK_PROTOCOL, AUTONAT_DIAL_REQUEST_PROTOCOL}; #[derive(Debug)] pub enum HandlerOutEvent { @@ -163,6 +166,9 @@ pub struct Handler { /// Waker used when opening a substream. waker: Option, + + /// Events to inform its behaviour or all other ConnectionHandlers + events: VecDeque>, } impl Handler { @@ -196,6 +202,7 @@ impl Handler { inbound: None, outbound: None, waker: None, + events: VecDeque::new(), } } @@ -249,6 +256,18 @@ impl Handler { .wake(); } } + + /// Report to all the ConnectionHandlers that the remote peer supports the AutoNAT V2 client and server protocols + fn report_remote_autonat_support(&mut self) { + let mut stream_protocols = HashSet::new(); + stream_protocols.insert(StreamProtocol::new(AUTONAT_DIAL_REQUEST_PROTOCOL)); + stream_protocols.insert(StreamProtocol::new(AUTONAT_DIAL_BACK_PROTOCOL)); + + self.events + .push_back(ConnectionHandlerEvent::ReportRemoteProtocols( + ProtocolSupport::Added(stream_protocols), + )); + } } /// Extract the `/ip4/`,`/ip6/`, `/dns4/` or `/dns6/` protocol part from a `Multiaddr` @@ -298,6 +317,7 @@ impl ConnectionHandler for Handler { } self.inbound = Some(protocol); self.check_initialized(); + self.report_remote_autonat_support(); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, .. @@ -310,6 +330,7 @@ impl ConnectionHandler for Handler { } self.outbound = Some(protocol); self.check_initialized(); + self.report_remote_autonat_support(); } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { error!(%error, "inject_dial_upgrade_error"); @@ -749,6 +770,10 @@ impl ConnectionHandler for Handler { } } + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + // If we've left the loop, we're waiting on something. Poll::Pending } diff --git a/network-libp2p/src/lib.rs b/network-libp2p/src/lib.rs index edddbb76f..678286467 100644 --- a/network-libp2p/src/lib.rs +++ b/network-libp2p/src/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; +mod autonat; mod behaviour; mod config; mod connection_pool; @@ -18,6 +19,8 @@ mod utils; pub const DISCOVERY_PROTOCOL: &str = "/nimiq/discovery/0.0.1"; pub const DHT_PROTOCOL: &str = "/nimiq/kad/0.0.1"; +pub const AUTONAT_DIAL_REQUEST_PROTOCOL: &str = "/libp2p/autonat/2/dial-request"; +pub const AUTONAT_DIAL_BACK_PROTOCOL: &str = "/libp2p/autonat/2/dial-back"; pub use config::{Config, TlsConfig}; pub use error::NetworkError; diff --git a/network-libp2p/src/network_types.rs b/network-libp2p/src/network_types.rs index 54844c7f4..493c83e9a 100644 --- a/network-libp2p/src/network_types.rs +++ b/network-libp2p/src/network_types.rs @@ -23,6 +23,7 @@ use thiserror::Error; use tokio::sync::{mpsc, oneshot}; use crate::{ + autonat::NatState, dispatch::codecs::{IncomingRequest, OutgoingResponse}, rate_limiting::RequestRateLimitData, NetworkError, @@ -238,6 +239,8 @@ pub(crate) struct TaskState { pub(crate) dht_bootstrap_state: DhtBootStrapState, /// DHT (kad) is in server mode pub(crate) dht_server_mode: bool, + /// The NAT status of the local peer + pub(crate) nat_status: NatState, /// Senders per `OutboundRequestId` for request-response pub(crate) requests: HashMap>>, /// Time spent per `OutboundRequestId` for request-response diff --git a/network-libp2p/src/swarm.rs b/network-libp2p/src/swarm.rs index 161818155..1b4caff6d 100644 --- a/network-libp2p/src/swarm.rs +++ b/network-libp2p/src/swarm.rs @@ -6,7 +6,7 @@ use instant::Instant; #[cfg(all(target_family = "wasm", not(feature = "tokio-websocket")))] use libp2p::websocket_websys; use libp2p::{ - autonat::{self, OutboundFailure}, + autonat::OutboundFailure, core::{ self, muxing::StreamMuxerBox, @@ -42,6 +42,7 @@ use tokio::sync::{broadcast, mpsc}; #[cfg(feature = "metrics")] use crate::network_metrics::NetworkMetrics; use crate::{ + autonat::NatStatus, behaviour, discovery::{behaviour::Event, peer_contacts::PeerContactBook}, network_types::{ @@ -395,26 +396,48 @@ fn handle_event( swarm .behaviour_mut() .discovery - .add_own_addresses([address].to_vec()); + .add_own_addresses([address.clone()].to_vec()); + if swarm.behaviour().is_address_dialable(&address) { + state.nat_status.add_address(address); + } + } + + SwarmEvent::ListenerClosed { + listener_id: _, + addresses, + reason: _, + } => { + addresses.iter().for_each(|address| { + state.nat_status.remove_address(address); + }); + } + + SwarmEvent::ExternalAddrConfirmed { address } => { + log::trace!(%address, "Address is confirmed and externally reachable"); + state.nat_status.add_confirmed_address(address); + } + + SwarmEvent::ExternalAddrExpired { address } => { + log::trace!(%address, "External address is expired and no longer externally reachable"); + state.nat_status.remove_confirmed_address(&address); } SwarmEvent::Behaviour(event) => { match event { - behaviour::BehaviourEvent::Autonat(event) => match event { - autonat::Event::InboundProbe(event) => { - log::trace!(?event, "Autonat inbound probe"); - } - autonat::Event::OutboundProbe(event) => { - log::trace!(?event, "Autonat outbound probe"); - } - autonat::Event::StatusChanged { old, new } => { - log::debug!(?old, ?new, "Autonat status changed"); - if new == autonat::NatStatus::Private { - log::warn!("Couldn't detect a public reachable address. Validator network operations won't be possible"); - log::warn!("You may need to find a relay to enable validator network operations"); - } + behaviour::BehaviourEvent::AutonatClient(event) => { + log::trace!(?event, "AutoNAT outbound probe"); + match event.result { + Ok(_) => state + .nat_status + .set_address_nat(event.tested_addr, NatStatus::Public), + Err(_) => state + .nat_status + .set_address_nat(event.tested_addr, NatStatus::Private), } - }, + } + behaviour::BehaviourEvent::AutonatServer(event) => { + log::trace!(?event, "AutoNAT inbound probe"); + } behaviour::BehaviourEvent::ConnectionLimits(_) => {} behaviour::BehaviourEvent::Dht(event) => { match event { diff --git a/network-libp2p/tests/network.rs b/network-libp2p/tests/network.rs index 1481486d2..c8986603e 100644 --- a/network-libp2p/tests/network.rs +++ b/network-libp2p/tests/network.rs @@ -317,7 +317,7 @@ async fn create_network_with_n_peers(n_peers: usize) -> Vec { #[test(tokio::test)] async fn connections_stress_and_reconnect() { - let peers: usize = 10; + let peers: usize = 5; let networks = create_network_with_n_peers(peers).await; assert_eq!(peers, networks.len());