diff --git a/doc/bips.md b/doc/bips.md index 70246b46e0155..8c6cf62b76e5e 100644 --- a/doc/bips.md +++ b/doc/bips.md @@ -41,4 +41,5 @@ Versions and PRs are relevant to Bitcoin's core if not mentioned other. * [`BIP 158`](https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki): Compact Block Filters for Light Clients can be indexed as of **Dash Core v18.0** ([PR dash#4314](https://github.com/dashpay/dash/pull/4314), [PR #14121](https://github.com/bitcoin/bitcoin/pull/14121)). * [`BIP 159`](https://github.com/bitcoin/bips/blob/master/bip-0159.mediawiki): The `NODE_NETWORK_LIMITED` service bit is signalled as of **v0.16.0** ([PR 11740](https://github.com/bitcoin/bitcoin/pull/11740)), and such nodes are connected to as of **v0.17.0** ([PR 10387](https://github.com/bitcoin/bitcoin/pull/10387)). * [`BIP 174`](https://github.com/bitcoin/bips/blob/master/bip-0174.mediawiki): RPCs to operate on Partially Signed Bitcoin Transactions (PSBT) are present as of **v18.0** ([PR 13557](https://github.com/bitcoin/bitcoin/pull/13557)). +* [`BIP 324`](https://github.com/bitcoin/bips/blob/master/bip-0324.mediawiki): The v2 transport protocol specified by BIP324 and the associated `NODE_P2P_V2` service bit are supported as of **v22.0**, but off by default ([PR 28331](https://github.com/bitcoin/bitcoin/pull/28331)). * [`BIP 339`](https://github.com/bitcoin/bips/blob/master/bip-0339.mediawiki): Relay of transactions by wtxid is supported as of **v0.21.0** ([PR 18044](https://github.com/bitcoin/bitcoin/pull/18044)). diff --git a/src/bitcoin-cli.cpp b/src/bitcoin-cli.cpp index 99a58d67d0701..11f1a144f5a1d 100644 --- a/src/bitcoin-cli.cpp +++ b/src/bitcoin-cli.cpp @@ -413,6 +413,7 @@ class NetinfoRequestHandler : public BaseRequestHandler std::string conn_type; std::string network; std::string age; + std::string transport_protocol_type; double min_ping; double ping; int64_t addr_processed; @@ -517,10 +518,11 @@ class NetinfoRequestHandler : public BaseRequestHandler const std::string addr{peer["addr"].get_str()}; const std::string age{conn_time == 0 ? "" : ToString((m_time_now - conn_time) / 60)}; const std::string sub_version{peer["subver"].get_str()}; + const std::string transport{peer["transport_protocol_type"].get_str()}; const bool is_addr_relay_enabled{peer["addr_relay_enabled"].isNull() ? false : peer["addr_relay_enabled"].get_bool()}; const bool is_bip152_hb_from{peer["bip152_hb_from"].get_bool()}; const bool is_bip152_hb_to{peer["bip152_hb_to"].get_bool()}; - m_peers.push_back({addr, sub_version, conn_type, NETWORK_SHORT_NAMES[network_id], age, min_ping, ping, addr_processed, addr_rate_limited, last_blck, last_recv, last_send, last_trxn, peer_id, mapped_as, version, is_addr_relay_enabled, is_bip152_hb_from, is_bip152_hb_to, is_block_relay, is_outbound}); + m_peers.push_back({addr, sub_version, conn_type, NETWORK_SHORT_NAMES[network_id], age, transport, min_ping, ping, addr_processed, addr_rate_limited, last_blck, last_recv, last_send, last_trxn, peer_id, mapped_as, version, is_addr_relay_enabled, is_bip152_hb_from, is_bip152_hb_to, is_block_relay, is_outbound}); m_max_addr_length = std::max(addr.length() + 1, m_max_addr_length); m_max_addr_processed_length = std::max(ToString(addr_processed).length(), m_max_addr_processed_length); m_max_addr_rate_limited_length = std::max(ToString(addr_rate_limited).length(), m_max_addr_rate_limited_length); @@ -536,7 +538,7 @@ class NetinfoRequestHandler : public BaseRequestHandler // Report detailed peer connections list sorted by direction and minimum ping time. if (DetailsRequested() && !m_peers.empty()) { std::sort(m_peers.begin(), m_peers.end()); - result += strprintf("<-> type net mping ping send recv txn blk hb %*s%*s%*s ", + result += strprintf("<-> type net tp mping ping send recv txn blk hb %*s%*s%*s ", m_max_addr_processed_length, "addrp", m_max_addr_rate_limited_length, "addrl", m_max_age_length, "age"); @@ -545,10 +547,11 @@ class NetinfoRequestHandler : public BaseRequestHandler for (const Peer& peer : m_peers) { std::string version{ToString(peer.version) + peer.sub_version}; result += strprintf( - "%3s %6s %5s%7s%7s%5s%5s%5s%5s %2s %*s%*s%*s%*i %*s %-*s%s\n", + "%3s %6s %5s %2s%7s%7s%5s%5s%5s%5s %2s %*s%*s%*s%*i %*s %-*s%s\n", peer.is_outbound ? "out" : "in", ConnectionTypeForNetinfo(peer.conn_type), peer.network, + peer.transport_protocol_type == "detecting" ? "*" : peer.transport_protocol_type, PingTimeToString(peer.min_ping), PingTimeToString(peer.ping), peer.last_send ? ToString(m_time_now - peer.last_send) : "", @@ -570,7 +573,7 @@ class NetinfoRequestHandler : public BaseRequestHandler IsAddressSelected() ? peer.addr : "", IsVersionSelected() && version != "0" ? version : ""); } - result += strprintf(" ms ms sec sec min min %*s\n\n", m_max_age_length, "min"); + result += strprintf(" ms ms sec sec min min %*s\n\n", m_max_age_length, "min"); } // Report peer connection totals by type. @@ -657,6 +660,7 @@ class NetinfoRequestHandler : public BaseRequestHandler " \"feeler\" - short-lived connection for testing addresses\n" " \"addr\" - address fetch; short-lived connection for requesting addresses\n" " net Network the peer connected through (\"ipv4\", \"ipv6\", \"onion\", \"i2p\", \"cjdns\", or \"npr\" (not publicly routable))\n" + " tp Transport protocol used for the connection (\"v1\", \"v2\" or \"*\" if detecting)\n" " mping Minimum observed ping time, in milliseconds (ms)\n" " ping Last observed ping time, in milliseconds (ms)\n" " send Time since last message sent to the peer, in seconds\n" diff --git a/src/init.cpp b/src/init.cpp index 16c7f048600c1..564a74f18ce69 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -588,6 +588,7 @@ void SetupServerArgs(ArgsManager& argsman) argsman.AddArg("-i2psam=", "I2P SAM proxy to reach I2P peers and accept I2P connections (default: none)", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-i2pacceptincoming", strprintf("Whether to accept inbound I2P connections (default: %i). Ignored if -i2psam is not set. Listening for inbound I2P connections is done through the SAM proxy, not by binding to a local address and port.", DEFAULT_I2P_ACCEPT_INCOMING), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-onlynet=", "Make automatic outbound connections only to network (" + Join(GetNetworkNames(), ", ") + "). Inbound and manual connections are not affected by this option. It can be specified multiple times to allow multiple networks.", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); + argsman.AddArg("-v2transport", strprintf("Support v2 transport (default: %u)", DEFAULT_V2_TRANSPORT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-peerblockfilters", strprintf("Serve compact block filters to peers per BIP 157 (default: %u)", DEFAULT_PEERBLOCKFILTERS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-peerbloomfilters", strprintf("Support filtering of blocks and transaction with bloom filters (default: %u)", DEFAULT_PEERBLOOMFILTERS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-peertimeout=", strprintf("Specify a p2p connection timeout delay in seconds. After connecting to a peer, wait this amount of time before considering disconnection based on inactivity (minimum: 1, default: %d)", DEFAULT_PEER_CONNECT_TIMEOUT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); @@ -1147,6 +1148,11 @@ bool AppInitParameterInteraction(const ArgsManager& args) } } + // Signal NODE_P2P_V2 if BIP324 v2 transport is enabled. + if (args.GetBoolArg("-v2transport", DEFAULT_V2_TRANSPORT)) { + nLocalServices = ServiceFlags(nLocalServices | NODE_P2P_V2); + } + // Signal NODE_COMPACT_FILTERS if peerblockfilters and basic filters index are both enabled. if (args.GetBoolArg("-peerblockfilters", DEFAULT_PEERBLOCKFILTERS)) { if (g_enabled_filter_types.count(BlockFilterType::BASIC_FILTER) != 1) { diff --git a/src/net.cpp b/src/net.cpp index 474fbec3e7010..b6e096ac753a1 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -483,7 +484,7 @@ static CAddress GetBindAddress(const Sock& sock) return addr_bind; } -CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) +CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); assert(conn_type != ConnectionType::INBOUND); @@ -505,11 +506,13 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo /// debug print if (fLogIPs) { - LogPrint(BCLog::NET, "trying connection %s lastseen=%.1fhrs\n", + LogPrint(BCLog::NET, "trying %s connection %s lastseen=%.1fhrs\n", + use_v2transport ? "v2" : "v1", pszDest ? pszDest : addrConnect.ToStringAddrPort(), pszDest ? 0.0 : (double)(GetAdjustedTime() - addrConnect.nTime)/3600.0); } else { - LogPrint(BCLog::NET, "trying connection lastseen=%.1fhrs\n", + LogPrint(BCLog::NET, "trying %s connection lastseen=%.1fhrs\n", + use_v2transport ? "v2" : "v1", pszDest ? 0.0 : (double)(GetAdjustedTime() - addrConnect.nTime)/3600.0); } @@ -631,6 +634,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo CNodeOptions{ .i2p_sam_session = std::move(i2p_transient_session), .recv_flood_size = nReceiveFloodSize, + .use_v2transport = use_v2transport, }); pnode->AddRef(); ::g_stats_client->inc("peers.connect", 1.0f); @@ -660,13 +664,6 @@ void CNode::CloseSocketDisconnect(CConnman* connman) connman->mapReceivableNodes.erase(GetId()); connman->mapSendableNodes.erase(GetId()); } - { - LOCK(connman->cs_mapNodesWithDataToSend); - if (connman->mapNodesWithDataToSend.erase(GetId()) != 0) { - // See comment in PushMessage - Release(); - } - } if (connman->m_edge_trig_events && !connman->m_edge_trig_events->UnregisterEvents(m_sock->Get())) { LogPrint(BCLog::NET, "EdgeTriggeredEvents::UnregisterEvents() failed\n"); @@ -753,6 +750,9 @@ void CNode::CopyStats(CNodeStats& stats) LOCK(cs_vRecv); X(mapRecvBytesPerMsgType); X(nRecvBytes); + Transport::Info info = m_transport->GetInfo(); + stats.m_transport_type = info.transport_type; + if (info.session_id) stats.m_session_id = HexStr(*info.session_id); } X(m_legacyWhitelisted); X(m_permission_flags); @@ -828,6 +828,11 @@ V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noex Reset(); } +Transport::Info V1Transport::GetInfo() const noexcept +{ + return {.transport_type = TransportProtocolType::V1, .session_id = {}}; +} + int V1Transport::readHeader(Span msg_bytes) { AssertLockHeld(m_recv_mutex); @@ -992,8 +997,7 @@ void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept m_bytes_sent = 0; } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) { // We're done sending a message's data. Wipe the data vector to reduce memory consumption. - m_message_to_send.data.clear(); - m_message_to_send.data.shrink_to_fit(); + ClearShrink(m_message_to_send.data); m_bytes_sent = 0; } } @@ -1187,10 +1191,10 @@ void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept AssertLockNotHeld(m_send_mutex); Assume(m_recv_state == RecvState::KEY_MAYBE_V1); // We still have to determine if this is a v1 or v2 connection. The bytes being received could - // be the beginning of either a v1 packet (network magic + "version\x00"), or of a v2 public - // key. BIP324 specifies that a mismatch with this 12-byte string should trigger sending of the - // key. - std::array v1_prefix = {0, 0, 0, 0, 'v', 'e', 'r', 's', 'i', 'o', 'n', 0}; + // be the beginning of either a v1 packet (network magic + "version\x00\x00\x00\x00\x00"), or + // of a v2 public key. BIP324 specifies that a mismatch with this 16-byte string should trigger + // sending of the key. + std::array v1_prefix = {0, 0, 0, 0, 'v', 'e', 'r', 's', 'i', 'o', 'n', 0, 0, 0, 0, 0}; std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), v1_prefix.begin()); Assume(m_recv_buffer.size() <= v1_prefix.size()); if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), v1_prefix.begin())) { @@ -1212,8 +1216,8 @@ void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept SetReceiveState(RecvState::V1); SetSendState(SendState::V1); // Reset v2 transport buffers to save memory. - m_recv_buffer = {}; - m_send_buffer = {}; + ClearShrink(m_recv_buffer); + ClearShrink(m_send_buffer); } else { // We have not received enough to distinguish v1 from v2 yet. Wait until more bytes come. } @@ -1272,8 +1276,7 @@ bool V2Transport::ProcessReceivedKeyBytes() noexcept /*ignore=*/false, /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION + VERSION_CONTENTS.size())); // We no longer need the garbage. - m_send_garbage.clear(); - m_send_garbage.shrink_to_fit(); + ClearShrink(m_send_garbage); } else { // We still have to receive more key bytes. } @@ -1343,8 +1346,7 @@ bool V2Transport::ProcessReceivedPacketBytes() noexcept return false; } // We have decrypted a valid packet with the AAD we expected, so clear the expected AAD. - m_recv_aad.clear(); - m_recv_aad.shrink_to_fit(); + ClearShrink(m_recv_aad); // Feed the last 4 bytes of the Poly1305 authentication tag (and its timing) into our RNG. RandAddEvent(ReadLE32(m_recv_buffer.data() + m_recv_buffer.size() - 4)); @@ -1367,9 +1369,9 @@ bool V2Transport::ProcessReceivedPacketBytes() noexcept } } // Wipe the receive buffer where the next packet will be received into. - m_recv_buffer = {}; + ClearShrink(m_recv_buffer); // In all but APP_READY state, we can wipe the decoded contents. - if (m_recv_state != RecvState::APP_READY) m_recv_decode_buffer = {}; + if (m_recv_state != RecvState::APP_READY) ClearShrink(m_recv_decode_buffer); } else { // We either have less than 3 bytes, so we don't know the packet's length yet, or more // than 3 bytes but less than the packet's full ciphertext. Wait until those arrive. @@ -1386,7 +1388,7 @@ size_t V2Transport::GetMaxBytesToProcess() noexcept // receive buffer. Assume(m_recv_buffer.size() <= V1_PREFIX_LEN); // As long as we're not sure if this is a v1 or v2 connection, don't receive more than what - // is strictly necessary to distinguish the two (12 bytes). If we permitted more than + // is strictly necessary to distinguish the two (16 bytes). If we permitted more than // the v1 header size (24 bytes), we may not be able to feed the already-received bytes // back into the m_v1_fallback V1 transport. return V1_PREFIX_LEN - m_recv_buffer.size(); @@ -1580,7 +1582,7 @@ CNetMessage V2Transport::GetReceivedMessage(std::chrono::microseconds time, bool LogPrint(BCLog::NET, "V2 transport error: invalid message type (%u bytes contents), peer=%d\n", m_recv_decode_buffer.size(), m_nodeid); reject_message = true; } - m_recv_decode_buffer = {}; + ClearShrink(m_recv_decode_buffer); SetReceiveState(RecvState::APP); return msg; @@ -1614,7 +1616,7 @@ bool V2Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept m_cipher.Encrypt(MakeByteSpan(contents), {}, false, MakeWritableByteSpan(m_send_buffer)); m_send_type = msg.m_type; // Release memory - msg.data = {}; + ClearShrink(msg.data); return true; } @@ -1641,15 +1643,39 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept LOCK(m_send_mutex); if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent); + if (m_send_state == SendState::AWAITING_KEY && m_send_pos == 0 && bytes_sent > 0) { + LogPrint(BCLog::NET, "start sending v2 handshake to peer=%d\n", m_nodeid); + } + m_send_pos += bytes_sent; Assume(m_send_pos <= m_send_buffer.size()); + if (m_send_pos >= CMessageHeader::HEADER_SIZE) { + m_sent_v1_header_worth = true; + } // Wipe the buffer when everything is sent. if (m_send_pos == m_send_buffer.size()) { m_send_pos = 0; - m_send_buffer = {}; + ClearShrink(m_send_buffer); } } +bool V2Transport::ShouldReconnectV1() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + AssertLockNotHeld(m_recv_mutex); + // Only outgoing connections need reconnection. + if (!m_initiating) return false; + + LOCK(m_recv_mutex); + // We only reconnect in the very first state and when the receive buffer is empty. Together + // these conditions imply nothing has been received so far. + if (m_recv_state != RecvState::KEY) return false; + if (!m_recv_buffer.empty()) return false; + // Check if we've sent enough for the other side to disconnect us (if it was V1). + LOCK(m_send_mutex); + return m_sent_v1_header_worth; +} + size_t V2Transport::GetSendMemoryUsage() const noexcept { AssertLockNotHeld(m_send_mutex); @@ -1659,6 +1685,27 @@ size_t V2Transport::GetSendMemoryUsage() const noexcept return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer); } +Transport::Info V2Transport::GetInfo() const noexcept +{ + AssertLockNotHeld(m_recv_mutex); + LOCK(m_recv_mutex); + if (m_recv_state == RecvState::V1) return m_v1_fallback.GetInfo(); + + Transport::Info info; + + // Do not report v2 and session ID until the version packet has been received + // and verified (confirming that the other side very likely has the same keys as us). + if (m_recv_state != RecvState::KEY_MAYBE_V1 && m_recv_state != RecvState::KEY && + m_recv_state != RecvState::GARB_GARBTERM && m_recv_state != RecvState::VERSION) { + info.transport_type = TransportProtocolType::V2; + info.session_id = uint256(MakeUCharSpan(m_cipher.GetSessionID())); + } else { + info.transport_type = TransportProtocolType::DETECTING; + } + + return info; +} + std::pair CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); @@ -1708,7 +1755,9 @@ std::pair CConnman::SocketSendData(CNode& node) const // Notify transport that bytes have been processed. node.m_transport->MarkBytesSent(nBytes); // Update statistics per message type. - node.AccountForSentBytes(msg_type, nBytes); + if (!msg_type.empty()) { // don't report v2 handshake bytes for now + node.AccountForSentBytes(msg_type, nBytes); + } nSentSize += nBytes; if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more @@ -1943,6 +1992,10 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, } const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end(); + // The V2Transport transparently falls back to V1 behavior when an incoming V1 connection is + // detected, so use it whenever we signal NODE_P2P_V2. + const bool use_v2transport(nodeServices & NODE_P2P_V2); + CNode* pnode = new CNode(id, std::move(sock), addr, @@ -1956,6 +2009,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, .permission_flags = permission_flags, .prefer_evict = discouraged, .recv_flood_size = nReceiveFloodSize, + .use_v2transport = use_v2transport, }); pnode->AddRef(); // If this flag is present, the user probably expect that RPC and QT report it as whitelisted (backward compatibility) @@ -2025,12 +2079,19 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ CSemaphoreGrant grant(*semOutbound, true); if (!grant) return false; - OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type); + OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/false); return true; } void CConnman::DisconnectNodes() { + AssertLockNotHeld(m_nodes_mutex); + AssertLockNotHeld(m_reconnections_mutex); + + // Use a temporary variable to accumulate desired reconnections, so we don't need + // m_reconnections_mutex while holding m_nodes_mutex. + decltype(m_reconnections) reconnections_to_add; + { LOCK(m_nodes_mutex); @@ -2093,6 +2154,19 @@ void CConnman::DisconnectNodes() // remove from m_nodes it = m_nodes.erase(it); + // Add to reconnection list if appropriate. We don't reconnect right here, because + // the creation of a connection is a blocking operation (up to several seconds), + // and we don't want to hold up the socket handler thread for that long. + if (pnode->m_transport->ShouldReconnectV1()) { + reconnections_to_add.push_back({ + .addr_connect = pnode->addr, + .grant = std::move(pnode->grantOutbound), + .destination = pnode->m_dest, + .conn_type = pnode->m_conn_type, + .use_v2transport = false}); + LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId()); + } + // release outbound grant (if any) pnode->grantOutbound.Release(); @@ -2122,6 +2196,11 @@ void CConnman::DisconnectNodes() } } } + { + // Move entries from reconnections_to_add to m_reconnections. + LOCK(m_reconnections_mutex); + m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add)); + } } void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync) @@ -2523,17 +2602,11 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) LOCK2(m_nodes_mutex, cs_sendable_receivable_nodes); if (!mapReceivableNodes.empty()) { return true; - } else if (!mapSendableNodes.empty()) { - if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) { - // We must check if at least one of the nodes with pending messages is also - // sendable, as otherwise a single node would be able to make the network - // thread busy with polling. - for (auto& p : mapNodesWithDataToSend) { - if (mapSendableNodes.count(p.first)) { - return true; - break; - } - } + } + for (const auto& p : mapSendableNodes) { + const auto& [to_send, more, _msg_type] = p.second->m_transport->GetBytesToSend(p.second->nSendMsgSize != 0); + if (!to_send.empty()) { + return true; } } return false; @@ -2609,52 +2682,30 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, assert(jt.first->second == it->second); it->second->fCanSendData = true; } + } - // collect nodes that have a receivable socket - // also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore - { - LOCK(cs_sendable_receivable_nodes); - - for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { - if (!it->second->fHasRecvData) { - it = mapReceivableNodes.erase(it); - } else { - // Implement the following logic: - // * If there is data to send, try sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try - // receiving data (which should succeed as the socket signalled as receivable). - if (!it->second->fPauseRecv && !it->second->fDisconnect && it->second->nSendMsgSize == 0) { - it->second->AddRef(); - vReceivableNodes.emplace(it->second); - } - ++it; - } - } + ForEachNode(AllNodes, [&](CNode* pnode) { + const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(pnode->nSendMsgSize != 0); + // Collect nodes that have a receivable socket, implement the following logic: + // * If there is data to send, try sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try + // receiving data (which should succeed as the socket signalled as receivable). + if (pnode->fHasRecvData && !pnode->fPauseRecv && !pnode->fDisconnect && + (!pnode->m_transport->ReceivedMessageComplete() || to_send.empty())) { + pnode->AddRef(); + vReceivableNodes.emplace(pnode); } - // collect nodes that have data to send and have a socket with non-empty write buffers - // also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration - // but don't have any in this iteration - LOCK(cs_mapNodesWithDataToSend); - for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) { - const auto& [to_send, more, _msg_type] = it->second->m_transport->GetBytesToSend(it->second->nSendMsgSize != 0); - if (to_send.empty() && !more) { - // See comment in PushMessage - it->second->Release(); - it = mapNodesWithDataToSend.erase(it); - } else { - if (it->second->fCanSendData) { - it->second->AddRef(); - vSendableNodes.emplace(it->second); - } - ++it; - } + // Collect nodes that have data to send and have a socket with non-empty write buffers + if (pnode->fCanSendData && (!pnode->m_transport->ReceivedMessageComplete() || !to_send.empty())) { + pnode->AddRef(); + vSendableNodes.emplace(pnode); } - } + }); for (CNode* pnode : vSendableNodes) { if (interruptNet) { @@ -2726,6 +2777,15 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, ++it; } } + // clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore + for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { + if (!it->second->fHasRecvData) { + LogPrint(BCLog::NET, "%s -- remove mapReceivableNodes, peer=%d\n", __func__, it->second->GetId()); + it = mapReceivableNodes.erase(it); + } else { + ++it; + } + } } } @@ -2963,10 +3023,13 @@ void CConnman::ProcessAddrFetch() strDest = m_addr_fetches.front(); m_addr_fetches.pop_front(); } + // Attempt v2 connection if we support v2 - we'll reconnect with v1 if our + // peer doesn't support it or immediately disconnects us for another reason. + const bool use_v2transport(GetLocalServices() & NODE_P2P_V2); CAddress addr; - CSemaphoreGrant grant(*semOutbound, true); + CSemaphoreGrant grant(*semOutbound, /*fTry=*/true); if (grant) { - OpenNetworkConnection(addr, false, &grant, strDest.c_str(), ConnectionType::ADDR_FETCH); + OpenNetworkConnection(addr, false, std::move(grant), strDest.c_str(), ConnectionType::ADDR_FETCH, use_v2transport); } } @@ -3035,16 +3098,20 @@ std::unordered_set CConnman::GetReachableEmptyNetworks() const void CConnman::ThreadOpenConnections(const std::vector connect, CDeterministicMNManager& dmnman) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); + AssertLockNotHeld(m_reconnections_mutex); FastRandomContext rng; // Connect to specific addresses if (!connect.empty()) { + // Attempt v2 connection if we support v2 - we'll reconnect with v1 if our + // peer doesn't support it or immediately disconnects us for another reason. + const bool use_v2transport(GetLocalServices() & NODE_P2P_V2); for (int64_t nLoop = 0;; nLoop++) { for (const std::string& strAddr : connect) { CAddress addr(CService(), NODE_NONE); - OpenNetworkConnection(addr, false, nullptr, strAddr.c_str(), ConnectionType::MANUAL); + OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/use_v2transport); for (int i = 0; i < 10 && i < nLoop; i++) { if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) @@ -3053,6 +3120,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe } if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; + PerformReconnections(); } } @@ -3077,6 +3145,8 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; + PerformReconnections(); + CSemaphoreGrant grant(*semOutbound); if (interruptNet) return; @@ -3097,7 +3167,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe // Perform cheap checks before locking a mutex. else if (!dnsseed && !use_seednodes) { LOCK(m_added_nodes_mutex); - if (m_added_nodes.empty()) { + if (m_added_node_params.empty()) { add_fixed_seeds_now = true; LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n"); } @@ -3367,7 +3437,9 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe // Don't record addrman failure attempts when node is offline. This can be identified since all local // network connections (if any) belong in the same netgroup, and the size of `outbound_ipv46_peer_netgroups` would only be 1. const bool count_failures{((int)outbound_ipv46_peer_netgroups.size() + outbound_privacy_network_peers) >= std::min(nMaxConnections - 1, 2)}; - OpenNetworkConnection(addrConnect, count_failures, &grant, /*strDest=*/nullptr, conn_type); + // Use BIP324 transport when both us and them have NODE_V2_P2P set. + const bool use_v2transport(addrConnect.nServices & GetLocalServices() & NODE_P2P_V2); + OpenNetworkConnection(addrConnect, count_failures, std::move(grant), /*strDest=*/nullptr, conn_type, use_v2transport); } } } @@ -3389,11 +3461,11 @@ std::vector CConnman::GetAddedNodeInfo() const { std::vector ret; - std::list lAddresses(0); + std::list lAddresses(0); { LOCK(m_added_nodes_mutex); - ret.reserve(m_added_nodes.size()); - std::copy(m_added_nodes.cbegin(), m_added_nodes.cend(), std::back_inserter(lAddresses)); + ret.reserve(m_added_node_params.size()); + std::copy(m_added_node_params.cbegin(), m_added_node_params.cend(), std::back_inserter(lAddresses)); } @@ -3413,9 +3485,9 @@ std::vector CConnman::GetAddedNodeInfo() const } } - for (const std::string& strAddNode : lAddresses) { - CService service(LookupNumeric(strAddNode, Params().GetDefaultPort(strAddNode))); - AddedNodeInfo addedNode{strAddNode, CService(), false, false}; + for (const auto& addr : lAddresses) { + CService service(LookupNumeric(addr.m_added_node, Params().GetDefaultPort(addr.m_added_node))); + AddedNodeInfo addedNode{addr, CService(), false, false}; if (service.IsValid()) { // strAddNode is an IP:port auto it = mapConnected.find(service); @@ -3426,7 +3498,7 @@ std::vector CConnman::GetAddedNodeInfo() const } } else { // strAddNode is a name - auto it = mapConnectedByName.find(strAddNode); + auto it = mapConnectedByName.find(addr.m_added_node); if (it != mapConnectedByName.end()) { addedNode.resolvedAddress = it->second.second; addedNode.fConnected = true; @@ -3442,6 +3514,7 @@ std::vector CConnman::GetAddedNodeInfo() const void CConnman::ThreadOpenAddedConnections() { AssertLockNotHeld(m_unused_i2p_sessions_mutex); + AssertLockNotHeld(m_reconnections_mutex); while (true) { CSemaphoreGrant grant(*semAddnode); @@ -3449,18 +3522,20 @@ void CConnman::ThreadOpenAddedConnections() bool tried = false; for (const AddedNodeInfo& info : vInfo) { if (!info.fConnected) { - if (!grant.TryAcquire()) { + if (!grant) { // If we've used up our semaphore and need a new one, let's not wait here since while we are waiting // the addednodeinfo state might change. break; } tried = true; CAddress addr(CService(), NODE_NONE); - OpenNetworkConnection(addr, false, &grant, info.strAddedNode.c_str(), ConnectionType::MANUAL); - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) - return; + OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; + grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true); } } + // See if any reconnections are desired. + PerformReconnections(); // Retry every 60 seconds if a connection was attempted, otherwise two seconds if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2))) return; @@ -3642,7 +3717,9 @@ void CConnman::ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman, } // if successful, this moves the passed grant to the constructed node -void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, ConnectionType conn_type, MasternodeConn masternode_connection, MasternodeProbeConn masternode_probe_connection) +void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, + const char *pszDest, ConnectionType conn_type, bool use_v2transport, + MasternodeConn masternode_connection, MasternodeProbeConn masternode_probe_connection) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); assert(conn_type != ConnectionType::INBOUND); @@ -3689,7 +3766,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return; LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- connecting to %s\n", __func__, getIpStr()); - CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type); + CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport); if (!pnode) { LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- ConnectNode failed for %s\n", __func__, getIpStr()); @@ -3701,8 +3778,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- successfully connected to %s, sock=%d, peer=%d\n", __func__, getIpStr(), pnode->m_sock->Get(), pnode->GetId()); } - if (grantOutbound) - grantOutbound->MoveTo(pnode->grantOutbound); + pnode->grantOutbound = std::move(grant_outbound); if (masternode_connection == MasternodeConn::IsConnection) pnode->m_masternode_connection = true; @@ -3733,7 +3809,8 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai } void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, MasternodeProbeConn probe) { - OpenNetworkConnection(addrConnect, false, nullptr, nullptr, ConnectionType::OUTBOUND_FULL_RELAY, MasternodeConn::IsConnection, probe); + OpenNetworkConnection(addrConnect, false, {}, /*strDest=*/nullptr, ConnectionType::OUTBOUND_FULL_RELAY, + /*use_v2transport=*/false, MasternodeConn::IsConnection, probe); } Mutex NetEventsInterface::g_msgproc_mutex; @@ -4261,10 +4338,6 @@ void CConnman::StopNodes() LOCK(cs_sendable_receivable_nodes); mapReceivableNodes.clear(); } - { - LOCK(cs_mapNodesWithDataToSend); - mapNodesWithDataToSend.clear(); - } m_nodes_disconnected.clear(); vhListenSocket.clear(); semOutbound.reset(); @@ -4345,23 +4418,23 @@ std::vector CConnman::GetAddresses(CNode& requestor, size_t max_addres return cache_entry.m_addrs_response_cache; } -bool CConnman::AddNode(const std::string& strNode) +bool CConnman::AddNode(const AddedNodeParams& add) { LOCK(m_added_nodes_mutex); - for (const std::string& it : m_added_nodes) { - if (strNode == it) return false; + for (const auto& it : m_added_node_params) { + if (add.m_added_node == it.m_added_node) return false; } - m_added_nodes.push_back(strNode); + m_added_node_params.push_back(add); return true; } bool CConnman::RemoveAddedNode(const std::string& strNode) { LOCK(m_added_nodes_mutex); - for(std::vector::iterator it = m_added_nodes.begin(); it != m_added_nodes.end(); ++it) { - if (strNode == *it) { - m_added_nodes.erase(it); + for (auto it = m_added_node_params.begin(); it != m_added_node_params.end(); ++it) { + if (strNode == it->m_added_node) { + m_added_node_params.erase(it); return true; } } @@ -4709,6 +4782,15 @@ ServiceFlags CConnman::GetLocalServices() const return nLocalServices; } +static std::unique_ptr MakeTransport(NodeId id, bool use_v2transport, bool inbound) noexcept +{ + if (use_v2transport) { + return std::make_unique(id, /*initiating=*/!inbound, SER_NETWORK, INIT_PROTO_VERSION); + } else { + return std::make_unique(id, SER_NETWORK, INIT_PROTO_VERSION); + } +} + CNode::CNode(NodeId idIn, std::shared_ptr sock, const CAddress& addrIn, @@ -4719,13 +4801,14 @@ CNode::CNode(NodeId idIn, ConnectionType conn_type_in, bool inbound_onion, CNodeOptions&& node_opts) - : m_transport{std::make_unique(idIn, SER_NETWORK, INIT_PROTO_VERSION)}, + : m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)}, m_permission_flags{node_opts.permission_flags}, m_sock{sock}, m_connected{GetTime()}, addr{addrIn}, addrBind{addrBindIn}, m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn}, + m_dest(addrNameIn), m_inbound_onion{inbound_onion}, m_prefer_evict{node_opts.prefer_evict}, nKeyedNetGroup{nKeyedNetGroupIn}, @@ -4820,17 +4903,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->vSendMsg.push_back(std::move(msg)); pnode->nSendMsgSize = pnode->vSendMsg.size(); - { - LOCK(cs_mapNodesWithDataToSend); - // we're not holding m_nodes_mutex here, so there is a chance of this node being disconnected shortly before - // we get here. Whoever called PushMessage still has a ref to CNode*, but will later Release() it, so we - // might end up having an entry in mapNodesWithDataToSend that is not in m_nodes anymore. We need to - // Add/Release refs when adding/erasing mapNodesWithDataToSend. - if (mapNodesWithDataToSend.emplace(pnode->GetId(), pnode).second) { - pnode->AddRef(); - } - } - // If there was nothing to send before, and there is now (predicted by the "more" value // returned by the GetBytesToSend call above), attempt "optimistic write": // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually @@ -4915,6 +4987,33 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize(); } +void CConnman::PerformReconnections() +{ + AssertLockNotHeld(m_reconnections_mutex); + AssertLockNotHeld(m_unused_i2p_sessions_mutex); + while (true) { + // Move first element of m_reconnections to todo (avoiding an allocation inside the lock). + decltype(m_reconnections) todo; + { + LOCK(m_reconnections_mutex); + if (m_reconnections.empty()) break; + todo.splice(todo.end(), m_reconnections, m_reconnections.begin()); + } + + auto& item = *todo.begin(); + OpenNetworkConnection(item.addr_connect, + // We only reconnect if the first attempt to connect succeeded at + // connection time, but then failed after the CNode object was + // created. Since we already know connecting is possible, do not + // count failure to reconnect. + /*fCountFailure=*/false, + std::move(item.grant), + item.destination.empty() ? nullptr : item.destination.c_str(), + item.conn_type, + item.use_v2transport); + } +} + void CaptureMessageToFile(const CAddress& addr, const std::string& msg_type, Span data, diff --git a/src/net.h b/src/net.h index 248a8a6b04016..b45b87329bb7a 100644 --- a/src/net.h +++ b/src/net.h @@ -114,6 +114,8 @@ static const bool DEFAULT_FIXEDSEEDS = true; static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000; static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000; +static constexpr bool DEFAULT_V2_TRANSPORT{false}; + #if defined USE_KQUEUE #define DEFAULT_SOCKETEVENTS "kqueue" #elif defined USE_EPOLL @@ -126,9 +128,13 @@ static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000; typedef int64_t NodeId; -struct AddedNodeInfo -{ - std::string strAddedNode; +struct AddedNodeParams { + std::string m_added_node; + bool m_use_v2transport; +}; + +struct AddedNodeInfo { + AddedNodeParams m_params; CService resolvedAddress; bool fConnected; bool fInbound; @@ -262,6 +268,10 @@ class CNodeStats uint256 verifiedPubKeyHash; bool m_masternode_connection; ConnectionType m_conn_type; + /** Transport protocol type. */ + TransportProtocolType m_transport_type; + /** BIP324 session id string in hex, if any. */ + std::string m_session_id; }; @@ -298,6 +308,15 @@ class Transport { public: virtual ~Transport() {} + struct Info + { + TransportProtocolType transport_type; + std::optional session_id; + }; + + /** Retrieve information about this transport. */ + virtual Info GetInfo() const noexcept = 0; + // 1. Receiver side functions, for decoding bytes received on the wire into transport protocol // agnostic CNetMessage (message type & payload) objects. @@ -391,6 +410,11 @@ class Transport { /** Return the memory usage of this transport attributable to buffered data to send. */ virtual size_t GetSendMemoryUsage() const noexcept = 0; + + // 3. Miscellaneous functions. + + /** Whether upon disconnections, a reconnect with V1 is warranted. */ + virtual bool ShouldReconnectV1() const noexcept = 0; }; class V1Transport final : public Transport @@ -451,6 +475,8 @@ class V1Transport final : public Transport return WITH_LOCK(m_recv_mutex, return CompleteInternal()); } + Info GetInfo() const noexcept override; + bool ReceivedBytes(Span& msg_bytes) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex) { AssertLockNotHeld(m_recv_mutex); @@ -470,6 +496,7 @@ class V1Transport final : public Transport BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); + bool ShouldReconnectV1() const noexcept override { return false; } }; class V2Transport final : public Transport @@ -482,7 +509,7 @@ class V2Transport final : public Transport /** The length of the V1 prefix to match bytes initially received by responders with to * determine if their peer is speaking V1 or V2. */ - static constexpr size_t V1_PREFIX_LEN = 12; + static constexpr size_t V1_PREFIX_LEN = 16; // The sender side and receiver side of V2Transport are state machines that are transitioned // through, based on what has been received. The receive state corresponds to the contents of, @@ -638,6 +665,8 @@ class V2Transport final : public Transport std::string m_send_type GUARDED_BY(m_send_mutex); /** Current sender state. */ SendState m_send_state GUARDED_BY(m_send_mutex); + /** Whether we've sent at least 24 bytes (which would trigger disconnect for V1 peers). */ + bool m_sent_v1_header_worth GUARDED_BY(m_send_mutex) {false}; /** Change the receive state. */ void SetReceiveState(RecvState recv_state) noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex); @@ -683,6 +712,10 @@ class V2Transport final : public Transport BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); + + // Miscellaneous functions. + bool ShouldReconnectV1() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex, !m_send_mutex); + Info GetInfo() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex); }; struct CNodeOptions @@ -691,6 +724,7 @@ struct CNodeOptions std::unique_ptr i2p_sam_session = nullptr; bool prefer_evict = false; size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000}; + bool use_v2transport = false; }; /** Information about a peer */ @@ -739,6 +773,8 @@ class CNode // Bind address of our side of the connection const CAddress addrBind; const std::string m_addr_name; + /** The pszDest argument provided to ConnectNode(). Only used for reconnections. */ + const std::string m_dest; //! Whether this peer is an inbound onion, i.e. connected via our Tor onion service. const bool m_inbound_onion; std::atomic nNumWarningsSkipped{0}; @@ -1197,7 +1233,12 @@ friend class CNode; vWhitelistedRange = connOptions.vWhitelistedRange; { LOCK(m_added_nodes_mutex); - m_added_nodes = connOptions.m_added_nodes; + // Attempt v2 connection if we support v2 - we'll reconnect with v1 if our + // peer doesn't support it or immediately disconnects us for another reason. + const bool use_v2transport(GetLocalServices() & NODE_P2P_V2); + for (const std::string& added_node : connOptions.m_added_nodes) { + m_added_node_params.push_back({added_node, use_v2transport}); + } } socketEventsMode = connOptions.socketEventsMode; m_onion_binds = connOptions.onion_binds; @@ -1237,8 +1278,8 @@ friend class CNode; IsConnection, }; - void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, - const char* strDest, ConnectionType conn_type, + void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, + const char* strDest, ConnectionType conn_type, bool use_v2transport, MasternodeConn masternode_connection = MasternodeConn::IsNotConnection, MasternodeProbeConn masternode_probe_connection = MasternodeProbeConn::IsNotConnection) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc); @@ -1425,7 +1466,7 @@ friend class CNode; // Count the number of block-relay-only peers we have over our limit. int GetExtraBlockRelayCount() const; - bool AddNode(const std::string& node) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); + bool AddNode(const AddedNodeParams& add) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); bool RemoveAddedNode(const std::string& node) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); std::vector GetAddedNodeInfo() const EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); @@ -1541,12 +1582,12 @@ friend class CNode; bool InitBinds(const Options& options); void ThreadOpenAddedConnections() - EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !mutexMsgProc); + EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex, !mutexMsgProc); void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex); void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex, !mutexMsgProc); void ThreadOpenConnections(const std::vector connect, CDeterministicMNManager& dmnman) - EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !mutexMsgProc); + EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex, !mutexMsgProc); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadI2PAcceptIncoming(CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void AcceptConnection(const ListenSocket& hListenSocket, CMasternodeSync& mn_sync) @@ -1566,7 +1607,7 @@ friend class CNode; const CAddress& addr, CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); - void DisconnectNodes(); + void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex); void NotifyNumConnectionsChanged(CMasternodeSync& mn_sync); void CalculateNumConnectionsChangedStats(); /** Return true if the peer is inactive and should be disconnected. */ @@ -1648,7 +1689,8 @@ friend class CNode; void SocketHandlerListening(const std::set& recv_set, CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); - void ThreadSocketHandler(CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc); + void ThreadSocketHandler(CMasternodeSync& mn_sync) + EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc, !m_nodes_mutex, !m_reconnections_mutex); void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex); void ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync) @@ -1668,8 +1710,7 @@ friend class CNode; bool AlreadyConnectedToAddress(const CAddress& addr); bool AttemptToEvictConnection(); - CNode* ConnectNode(CAddress addrConnect, const char *pszDest = nullptr, bool fCountFailure = false, ConnectionType conn_type = ConnectionType::OUTBOUND_FULL_RELAY) - EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); + CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const; void DeleteNode(CNode* pnode); @@ -1729,7 +1770,10 @@ friend class CNode; const NetGroupManager& m_netgroupman; std::deque m_addr_fetches GUARDED_BY(m_addr_fetches_mutex); Mutex m_addr_fetches_mutex; - std::vector m_added_nodes GUARDED_BY(m_added_nodes_mutex); + + // connection string and whether to use v2 p2p + std::vector m_added_node_params GUARDED_BY(m_added_nodes_mutex); + mutable Mutex m_added_nodes_mutex; std::vector m_nodes GUARDED_BY(m_nodes_mutex); std::list m_nodes_disconnected; @@ -1856,9 +1900,6 @@ friend class CNode; Mutex cs_sendable_receivable_nodes; std::unordered_map mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); std::unordered_map mapSendableNodes GUARDED_BY(cs_sendable_receivable_nodes); - /** Protected by cs_mapNodesWithDataToSend */ - std::unordered_map mapNodesWithDataToSend GUARDED_BY(cs_mapNodesWithDataToSend); - mutable RecursiveMutex cs_mapNodesWithDataToSend; std::thread threadDNSAddressSeed; std::thread threadSocketHandler; @@ -1899,6 +1940,29 @@ friend class CNode; */ std::queue> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex); + /** + * Mutex protecting m_reconnections. + */ + Mutex m_reconnections_mutex; + + /** Struct for entries in m_reconnections. */ + struct ReconnectionInfo + { + CAddress addr_connect; + CSemaphoreGrant grant; + std::string destination; + ConnectionType conn_type; + bool use_v2transport; + }; + + /** + * List of reconnections we have to make. + */ + std::list m_reconnections GUARDED_BY(m_reconnections_mutex); + + /** Attempt reconnections, if m_reconnections non-empty. */ + void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc, !m_reconnections_mutex, !m_unused_i2p_sessions_mutex); + /** * Cap on the size of `m_unused_i2p_sessions`, to ensure it does not * unexpectedly use too much memory. diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9536b8fd07b61..861753e7d3832 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -3596,11 +3596,14 @@ void PeerManagerImpl::ProcessMessage( return; } - if (!pfrom.IsInboundConn()) { - LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n", + // Log succesful connections unconditionally for outbound, but not for inbound as those + // can be triggered by an attacker at high rate. + if (!pfrom.IsInboundConn() || LogAcceptCategory(BCLog::NET)) { + LogPrintf("New %s %s peer connected: version: %d, blocks=%d, peer=%d%s\n", + pfrom.ConnectionTypeAsString(), + TransportTypeAsString(pfrom.m_transport->GetInfo().transport_type), pfrom.nVersion.load(), peer->m_starting_height, - pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""), - pfrom.ConnectionTypeAsString()); + pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : "")); } if (is_masternode && !pfrom.m_masternode_probe_connection) { diff --git a/src/node/connection_types.cpp b/src/node/connection_types.cpp index 904f4371aafd1..5e4dc5bf2ef94 100644 --- a/src/node/connection_types.cpp +++ b/src/node/connection_types.cpp @@ -24,3 +24,17 @@ std::string ConnectionTypeAsString(ConnectionType conn_type) assert(false); } + +std::string TransportTypeAsString(TransportProtocolType transport_type) +{ + switch (transport_type) { + case TransportProtocolType::DETECTING: + return "detecting"; + case TransportProtocolType::V1: + return "v1"; + case TransportProtocolType::V2: + return "v2"; + } // no default case, so the compiler can warn about missing cases + + assert(false); +} diff --git a/src/node/connection_types.h b/src/node/connection_types.h index 5e1abcace67d1..a911b95f7e917 100644 --- a/src/node/connection_types.h +++ b/src/node/connection_types.h @@ -6,6 +6,7 @@ #define BITCOIN_NODE_CONNECTION_TYPES_H #include +#include /** Different types of connections to a peer. This enum encapsulates the * information we have available at the time of opening or accepting the @@ -79,4 +80,14 @@ enum class ConnectionType { /** Convert ConnectionType enum to a string value */ std::string ConnectionTypeAsString(ConnectionType conn_type); +/** Transport layer version */ +enum class TransportProtocolType : uint8_t { + DETECTING, //!< Peer could be v1 or v2 + V1, //!< Unencrypted, plaintext protocol + V2, //!< BIP324 protocol +}; + +/** Convert TransportProtocolType enum to a string value */ +std::string TransportTypeAsString(TransportProtocolType transport_type); + #endif // BITCOIN_NODE_CONNECTION_TYPES_H diff --git a/src/protocol.cpp b/src/protocol.cpp index 297932cf4de61..f77bda78fcc88 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -338,6 +338,7 @@ static std::string serviceFlagToStr(size_t bit) case NODE_COMPACT_FILTERS: return "COMPACT_FILTERS"; case NODE_NETWORK_LIMITED: return "NETWORK_LIMITED"; case NODE_HEADERS_COMPRESSED: return "HEADERS_COMPRESSED"; + case NODE_P2P_V2: return "P2P_V2"; // Not using default, so we get warned when a case is missing } diff --git a/src/protocol.h b/src/protocol.h index a7913802abd8e..44930e750da3f 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -328,6 +328,9 @@ enum ServiceFlags : uint64_t { // description will be provided NODE_HEADERS_COMPRESSED = (1 << 11), + // NODE_P2P_V2 means the node supports BIP324 transport + NODE_P2P_V2 = (1 << 12), + // Bits 24-31 are reserved for temporary experiments. Just pick a bit that // isn't getting used, or one not being used much, and notify the // bitcoin-development mailing list. Remember that service bits are just diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 80fc983824576..0cfef54e65362 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -232,6 +232,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "addpeeraddress", 2, "tried"}, { "sendmsgtopeer", 0, "peer_id" }, { "stop", 0, "wait" }, + { "addnode", 2, "v2transport" }, { "verifychainlock", 2, "blockHeight" }, { "verifyislock", 3, "maxHeight" }, { "submitchainlock", 2, "blockHeight" }, diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index 2440db6ea69fd..cb69c3112a381 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -40,6 +40,12 @@ const std::vector CONNECTION_TYPE_DOC{ "feeler (short-lived automatic connection for testing addresses)" }; +const std::vector TRANSPORT_TYPE_DOC{ + "detecting (peer could be v1 or v2)", + "v1 (plaintext transport protocol)", + "v2 (BIP324 encrypted transport protocol)" +}; + static RPCHelpMan getconnectioncount() { return RPCHelpMan{"getconnectioncount", @@ -166,6 +172,8 @@ static RPCHelpMan getpeerinfo() {RPCResult::Type::STR, "connection_type", "Type of connection: \n" + Join(CONNECTION_TYPE_DOC, ",\n") + ".\n" "Please note this output is unlikely to be stable in upcoming releases as we iterate to\n" "best capture connection behaviors."}, + {RPCResult::Type::STR, "transport_protocol_type", "Type of transport protocol: \n" + Join(TRANSPORT_TYPE_DOC, ",\n") + ".\n"}, + {RPCResult::Type::STR, "session_id", "The session ID for this connection, or \"\" if there is none (\"v2\" transport protocol only).\n"}, }}, }}}, RPCExamples{ @@ -280,6 +288,8 @@ static RPCHelpMan getpeerinfo() } obj.pushKV("bytesrecv_per_msg", recvPerMsgType); obj.pushKV("connection_type", ConnectionTypeAsString(stats.m_conn_type)); + obj.pushKV("transport_protocol_type", TransportTypeAsString(stats.m_transport_type)); + obj.pushKV("session_id", stats.m_session_id); ret.push_back(obj); } @@ -301,11 +311,12 @@ static RPCHelpMan addnode() { {"node", RPCArg::Type::STR, RPCArg::Optional::NO, "The node (see getpeerinfo for nodes)"}, {"command", RPCArg::Type::STR, RPCArg::Optional::NO, "'add' to add a node to the list, 'remove' to remove a node from the list, 'onetry' to try a connection to the node once"}, + {"v2transport", RPCArg::Type::BOOL, RPCArg::DefaultHint{"set by -v2transport"}, "Attempt to connect using BIP324 v2 transport protocol (ignored for 'remove' command)"}, }, RPCResult{RPCResult::Type::NONE, "", ""}, RPCExamples{ - HelpExampleCli("addnode", "\"192.168.0.6:9999\" \"onetry\"") - + HelpExampleRpc("addnode", "\"192.168.0.6:9999\", \"onetry\"") + HelpExampleCli("addnode", "\"192.168.0.6:9999\" \"onetry\" true") + + HelpExampleRpc("addnode", "\"192.168.0.6:9999\", \"onetry\" true") }, [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { @@ -321,17 +332,23 @@ static RPCHelpMan addnode() CConnman& connman = EnsureConnman(node); std::string strNode = request.params[0].get_str(); + bool node_v2transport = connman.GetLocalServices() & NODE_P2P_V2; + bool use_v2transport = request.params[2].isNull() ? node_v2transport : request.params[2].get_bool(); + + if (use_v2transport && !node_v2transport) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Error: v2transport requested but not enabled (see -v2transport)"); + } if (strCommand == "onetry") { CAddress addr; - connman.OpenNetworkConnection(addr, false, nullptr, strNode.c_str(), ConnectionType::MANUAL); + connman.OpenNetworkConnection(addr, false, /*grant_outbound=*/{}, strNode.c_str(), ConnectionType::MANUAL, use_v2transport); return NullUniValue; } if (strCommand == "add") { - if (!connman.AddNode(strNode)) { + if (!connman.AddNode({strNode, use_v2transport})) { throw JSONRPCError(RPC_CLIENT_NODE_ALREADY_ADDED, "Error: Node already added"); } } @@ -492,7 +509,7 @@ static RPCHelpMan getaddednodeinfo() if (!request.params[0].isNull()) { bool found = false; for (const AddedNodeInfo& info : vInfo) { - if (info.strAddedNode == request.params[0].get_str()) { + if (info.m_params.m_added_node == request.params[0].get_str()) { vInfo.assign(1, info); found = true; break; @@ -507,7 +524,7 @@ static RPCHelpMan getaddednodeinfo() for (const AddedNodeInfo& info : vInfo) { UniValue obj(UniValue::VOBJ); - obj.pushKV("addednode", info.strAddedNode); + obj.pushKV("addednode", info.m_params.m_added_node); obj.pushKV("connected", info.fConnected); UniValue addresses(UniValue::VARR); if (info.fConnected) { diff --git a/src/sync.h b/src/sync.h index 3712bd9824e5f..751a37fd7e963 100644 --- a/src/sync.h +++ b/src/sync.h @@ -354,6 +354,10 @@ using ReadLock = SharedLock decltype(auto) { LOCK(cs); code; }() #define WITH_READ_LOCK(cs, code) [&]() -> decltype(auto) { READ_LOCK(cs); code; }() +/** An implementation of a semaphore. + * + * See https://en.wikipedia.org/wiki/Semaphore_(programming) + */ class CSemaphore { private: @@ -362,25 +366,33 @@ class CSemaphore int value; public: - explicit CSemaphore(int init) : value(init) {} + explicit CSemaphore(int init) noexcept : value(init) {} + + // Disallow default construct, copy, move. + CSemaphore() = delete; + CSemaphore(const CSemaphore&) = delete; + CSemaphore(CSemaphore&&) = delete; + CSemaphore& operator=(const CSemaphore&) = delete; + CSemaphore& operator=(CSemaphore&&) = delete; - void wait() + void wait() noexcept { std::unique_lock lock(mutex); condition.wait(lock, [&]() { return value >= 1; }); value--; } - bool try_wait() + bool try_wait() noexcept { std::lock_guard lock(mutex); - if (value < 1) + if (value < 1) { return false; + } value--; return true; } - void post() + void post() noexcept { { std::lock_guard lock(mutex); @@ -398,45 +410,64 @@ class CSemaphoreGrant bool fHaveGrant; public: - void Acquire() + void Acquire() noexcept { - if (fHaveGrant) + if (fHaveGrant) { return; + } sem->wait(); fHaveGrant = true; } - void Release() + void Release() noexcept { - if (!fHaveGrant) + if (!fHaveGrant) { return; + } sem->post(); fHaveGrant = false; } - bool TryAcquire() + bool TryAcquire() noexcept { - if (!fHaveGrant && sem->try_wait()) + if (!fHaveGrant && sem->try_wait()) { fHaveGrant = true; + } return fHaveGrant; } - void MoveTo(CSemaphoreGrant& grant) + // Disallow copy. + CSemaphoreGrant(const CSemaphoreGrant&) = delete; + CSemaphoreGrant& operator=(const CSemaphoreGrant&) = delete; + + // Allow move. + CSemaphoreGrant(CSemaphoreGrant&& other) noexcept { - grant.Release(); - grant.sem = sem; - grant.fHaveGrant = fHaveGrant; - fHaveGrant = false; + sem = other.sem; + fHaveGrant = other.fHaveGrant; + other.fHaveGrant = false; + other.sem = nullptr; } - CSemaphoreGrant() : sem(nullptr), fHaveGrant(false) {} + CSemaphoreGrant& operator=(CSemaphoreGrant&& other) noexcept + { + Release(); + sem = other.sem; + fHaveGrant = other.fHaveGrant; + other.fHaveGrant = false; + other.sem = nullptr; + return *this; + } - explicit CSemaphoreGrant(CSemaphore& sema, bool fTry = false) : sem(&sema), fHaveGrant(false) + CSemaphoreGrant() noexcept : sem(nullptr), fHaveGrant(false) {} + + explicit CSemaphoreGrant(CSemaphore& sema, bool fTry = false) noexcept : sem(&sema), fHaveGrant(false) { - if (fTry) + if (fTry) { TryAcquire(); - else + } else { Acquire(); + } } ~CSemaphoreGrant() @@ -444,7 +475,7 @@ class CSemaphoreGrant Release(); } - operator bool() const + explicit operator bool() const noexcept { return fHaveGrant; } diff --git a/src/test/fuzz/bip324.cpp b/src/test/fuzz/bip324.cpp index afe3cafafe65a..7954ea4ad154c 100644 --- a/src/test/fuzz/bip324.cpp +++ b/src/test/fuzz/bip324.cpp @@ -94,7 +94,7 @@ FUZZ_TARGET_INIT(bip324_cipher_roundtrip, initialize_bip324) unsigned damage_bit = provider.ConsumeIntegralInRange(0, (ciphertext.size() + aad.size()) * 8U - 1U); unsigned damage_pos = damage_bit >> 3; - std::byte damage_val{(uint8_t)(1U << (damage_bit & 3))}; + std::byte damage_val{(uint8_t)(1U << (damage_bit & 7))}; if (damage_pos >= ciphertext.size()) { aad[damage_pos - ciphertext.size()] ^= damage_val; } else { diff --git a/src/test/fuzz/connman.cpp b/src/test/fuzz/connman.cpp index 75afc58bc45fd..54c33843df984 100644 --- a/src/test/fuzz/connman.cpp +++ b/src/test/fuzz/connman.cpp @@ -54,7 +54,7 @@ FUZZ_TARGET_INIT(connman, initialize_connman) random_string = fuzzed_data_provider.ConsumeRandomLengthString(64); }, [&] { - connman.AddNode(random_string); + connman.AddNode({random_string, fuzzed_data_provider.ConsumeBool()}); }, [&] { connman.CheckIncomingNonce(fuzzed_data_provider.ConsumeIntegral()); diff --git a/src/test/fuzz/p2p_transport_serialization.cpp b/src/test/fuzz/p2p_transport_serialization.cpp index eba7dac8aebd6..530ce8524d8d0 100644 --- a/src/test/fuzz/p2p_transport_serialization.cpp +++ b/src/test/fuzz/p2p_transport_serialization.cpp @@ -323,6 +323,9 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa // Make sure all expected messages were received. assert(expected[0].empty()); assert(expected[1].empty()); + + // Compare session IDs. + assert(transports[0]->GetInfo().session_id == transports[1]->GetInfo().session_id); } std::unique_ptr MakeV1Transport(NodeId nodeid) noexcept diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp index 241bfe64ec6db..806bf0daadb82 100644 --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -1328,6 +1328,14 @@ class V2TransportTester SendPacket(contents); } + /** Test whether the transport's session ID matches the session ID we expect. */ + void CompareSessionIDs() const + { + auto info = m_transport.GetInfo(); + BOOST_CHECK(info.session_id); + BOOST_CHECK(uint256(MakeUCharSpan(m_cipher.GetSessionID())) == *info.session_id); + } + /** Introduce a bit error in the data scheduled to be sent. */ void Damage() { @@ -1353,6 +1361,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test) BOOST_REQUIRE(ret && ret->empty()); tester.ReceiveGarbage(); tester.ReceiveVersion(); + tester.CompareSessionIDs(); auto msg_data_1 = g_insecure_rand_ctx.randbytes(InsecureRandRange(100000)); auto msg_data_2 = g_insecure_rand_ctx.randbytes(InsecureRandRange(1000)); tester.SendMessage(uint8_t(4), msg_data_1); // cmpctblock short id @@ -1393,6 +1402,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test) BOOST_REQUIRE(ret && ret->empty()); tester.ReceiveGarbage(); tester.ReceiveVersion(); + tester.CompareSessionIDs(); auto msg_data_1 = g_insecure_rand_ctx.randbytes(InsecureRandRange(100000)); auto msg_data_2 = g_insecure_rand_ctx.randbytes(InsecureRandRange(1000)); tester.SendMessage(uint8_t(14), msg_data_1); // inv short id @@ -1446,6 +1456,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test) BOOST_REQUIRE(ret && ret->empty()); tester.ReceiveGarbage(); tester.ReceiveVersion(); + tester.CompareSessionIDs(); for (unsigned d = 0; d < num_decoys_1; ++d) { auto decoy_data = g_insecure_rand_ctx.randbytes(InsecureRandRange(1000)); tester.SendPacket(/*content=*/decoy_data, /*aad=*/{}, /*ignore=*/true); @@ -1523,6 +1534,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test) BOOST_REQUIRE(ret && ret->empty()); tester.ReceiveGarbage(); tester.ReceiveVersion(); + tester.CompareSessionIDs(); auto msg_data_1 = g_insecure_rand_ctx.randbytes(MAX_PROTOCOL_MESSAGE_LENGTH); // test that receiving max size payload works auto msg_data_2 = g_insecure_rand_ctx.randbytes(MAX_PROTOCOL_MESSAGE_LENGTH); // test that sending max size payload works tester.SendMessage(uint8_t(InsecureRandRange(223) + 33), {}); // unknown short id diff --git a/src/test/util/net.h b/src/test/util/net.h index 893ade2430d8e..933309b6983da 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -62,6 +62,7 @@ constexpr ServiceFlags ALL_SERVICE_FLAGS[]{ NODE_COMPACT_FILTERS, NODE_NETWORK_LIMITED, NODE_HEADERS_COMPRESSED, + NODE_P2P_V2, }; constexpr NetPermissionFlags ALL_NET_PERMISSION_FLAGS[]{ diff --git a/src/test/util_tests.cpp b/src/test/util_tests.cpp index 444714ec78196..55d1cd50d9e3b 100644 --- a/src/test/util_tests.cpp +++ b/src/test/util_tests.cpp @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -2723,4 +2724,28 @@ BOOST_AUTO_TEST_CASE(util_ParseByteUnits) BOOST_CHECK(!ParseByteUnits("1x", noop)); } +BOOST_AUTO_TEST_CASE(clearshrink_test) +{ + { + std::vector v = {1, 2, 3}; + ClearShrink(v); + BOOST_CHECK_EQUAL(v.size(), 0); + BOOST_CHECK_EQUAL(v.capacity(), 0); + } + + { + std::vector v = {false, true, false, false, true, true}; + ClearShrink(v); + BOOST_CHECK_EQUAL(v.size(), 0); + BOOST_CHECK_EQUAL(v.capacity(), 0); + } + + { + std::deque v = {1, 3, 3, 7}; + ClearShrink(v); + BOOST_CHECK_EQUAL(v.size(), 0); + // std::deque has no capacity() we can observe. + } +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/util/vector.h b/src/util/vector.h index dab65ded2ace1..ad0d71c941f80 100644 --- a/src/util/vector.h +++ b/src/util/vector.h @@ -48,4 +48,22 @@ inline V Cat(V v1, const V& v2) return v1; } +/** Clear a vector (or std::deque) and release its allocated memory. */ +template +inline void ClearShrink(V& v) noexcept +{ + // There are various ways to clear a vector and release its memory: + // + // 1. V{}.swap(v) + // 2. v = V{} + // 3. v = {}; v.shrink_to_fit(); + // 4. v.clear(); v.shrink_to_fit(); + // + // (2) does not appear to release memory in glibc debug mode, even if v.shrink_to_fit() + // follows. (3) and (4) rely on std::vector::shrink_to_fit, which is only a non-binding + // request. Therefore, we use method (1). + + V{}.swap(v); +} + #endif // BITCOIN_UTIL_VECTOR_H diff --git a/test/functional/p2p_node_network_limited.py b/test/functional/p2p_node_network_limited.py index ae54419352c06..7ab31c71c8a50 100755 --- a/test/functional/p2p_node_network_limited.py +++ b/test/functional/p2p_node_network_limited.py @@ -8,7 +8,15 @@ and that it responds to getdata requests for blocks correctly: - send a block within 288 + 2 of the tip - disconnect peers who request blocks older than that.""" -from test_framework.messages import CInv, MSG_BLOCK, msg_getdata, NODE_BLOOM, NODE_NETWORK_LIMITED, NODE_HEADERS_COMPRESSED +from test_framework.messages import ( + CInv, + MSG_BLOCK, + NODE_BLOOM, + NODE_HEADERS_COMPRESSED, + NODE_NETWORK_LIMITED, + NODE_P2P_V2, + msg_getdata, +) from test_framework.governance import EXPECTED_STDERR_NO_GOV_PRUNE from test_framework.p2p import P2PInterface from test_framework.test_framework import BitcoinTestFramework @@ -48,6 +56,8 @@ def run_test(self): node = self.nodes[0].add_p2p_connection(P2PIgnoreInv()) expected_services = NODE_BLOOM | NODE_NETWORK_LIMITED | NODE_HEADERS_COMPRESSED + if self.options.v2transport: + expected_services |= NODE_P2P_V2 self.log.info("Check that node has signalled expected services.") assert_equal(node.nServices, expected_services) diff --git a/test/functional/p2p_timeouts.py b/test/functional/p2p_timeouts.py index 5c0014cb2ba06..7f68f19fd4aba 100755 --- a/test/functional/p2p_timeouts.py +++ b/test/functional/p2p_timeouts.py @@ -68,11 +68,14 @@ def run_test(self): with self.nodes[0].assert_debug_log(['Unsupported message "ping" prior to verack from peer=0']): no_verack_node.send_message(msg_ping()) - with self.nodes[0].assert_debug_log(['non-version message before version handshake. Message "ping" from peer=1']): - no_version_node.send_message(msg_ping()) - self.mock_forward(1) + # With v2, non-version messages before the handshake would be interpreted as part of the key exchange. + # Therefore, don't execute this part of the test if v2transport is chosen. + if not self.options.v2transport: + with self.nodes[0].assert_debug_log(['non-version message before version handshake. Message "ping" from peer=1']): + no_version_node.send_message(msg_ping()) + self.mock_forward(1) assert "version" in no_verack_node.last_message assert no_verack_node.is_connected @@ -80,11 +83,12 @@ def run_test(self): assert no_send_node.is_connected no_verack_node.send_message(msg_ping()) - no_version_node.send_message(msg_ping()) + if not self.options.v2transport: + no_version_node.send_message(msg_ping()) expected_timeout_logs = [ "version handshake timeout peer=0", - "socket no message in first 3 seconds, 1 0 peer=1", + f"socket no message in first 3 seconds, {'0' if self.options.v2transport else '1'} 0 peer=1", "socket no message in first 3 seconds, 0 0 peer=2", ] @@ -95,5 +99,6 @@ def run_test(self): no_send_node.wait_for_disconnect(timeout=1) + if __name__ == '__main__': TimeoutsTest().main() diff --git a/test/functional/p2p_v2_transport.py b/test/functional/p2p_v2_transport.py new file mode 100755 index 0000000000000..a7f208c858e2c --- /dev/null +++ b/test/functional/p2p_v2_transport.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +# Copyright (c) 2021-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test v2 transport +""" +import socket + +from test_framework.messages import NODE_P2P_V2 +from test_framework.p2p import MAGIC_BYTES +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_equal, + p2p_port, +) + + +class V2TransportTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 5 + self.extra_args = [["-v2transport=1"], ["-v2transport=1"], ["-v2transport=0"], ["-v2transport=0"], ["-v2transport=0"]] + + def run_test(self): + sending_handshake = "start sending v2 handshake to peer" + downgrading_to_v1 = "retrying with v1 transport protocol for peer" + self.disconnect_nodes(0, 1) + self.disconnect_nodes(1, 2) + self.disconnect_nodes(2, 3) + self.disconnect_nodes(3, 4) + + # verify local services + network_info = self.nodes[2].getnetworkinfo() + assert_equal(int(network_info["localservices"], 16) & NODE_P2P_V2, 0) + assert "P2P_V2" not in network_info["localservicesnames"] + network_info = self.nodes[1].getnetworkinfo() + assert_equal(int(network_info["localservices"], 16) & NODE_P2P_V2, NODE_P2P_V2) + assert "P2P_V2" in network_info["localservicesnames"] + + # V2 nodes can sync with V2 nodes + assert_equal(self.nodes[0].getblockcount(), 0) + assert_equal(self.nodes[1].getblockcount(), 0) + with self.nodes[0].assert_debug_log(expected_msgs=[sending_handshake], + unexpected_msgs=[downgrading_to_v1]): + self.connect_nodes(0, 1, peer_advertises_v2=True) + self.generate(self.nodes[0], 5, sync_fun=lambda: self.sync_all(self.nodes[0:2])) + assert_equal(self.nodes[1].getblockcount(), 5) + # verify there is a v2 connection between node 0 and 1 + node_0_info = self.nodes[0].getpeerinfo() + node_1_info = self.nodes[1].getpeerinfo() + assert_equal(len(node_0_info), 1) + assert_equal(len(node_1_info), 1) + assert_equal(node_0_info[0]["transport_protocol_type"], "v2") + assert_equal(node_1_info[0]["transport_protocol_type"], "v2") + assert_equal(len(node_0_info[0]["session_id"]), 64) + assert_equal(len(node_1_info[0]["session_id"]), 64) + assert_equal(node_0_info[0]["session_id"], node_1_info[0]["session_id"]) + + # V1 nodes can sync with each other + assert_equal(self.nodes[2].getblockcount(), 0) + assert_equal(self.nodes[3].getblockcount(), 0) + with self.nodes[2].assert_debug_log(expected_msgs=[], + unexpected_msgs=[sending_handshake, downgrading_to_v1]): + self.connect_nodes(2, 3, peer_advertises_v2=False) + self.generate(self.nodes[2], 8, sync_fun=lambda: self.sync_all(self.nodes[2:4])) + assert_equal(self.nodes[3].getblockcount(), 8) + assert self.nodes[0].getbestblockhash() != self.nodes[2].getbestblockhash() + # verify there is a v1 connection between node 2 and 3 + node_2_info = self.nodes[2].getpeerinfo() + node_3_info = self.nodes[3].getpeerinfo() + assert_equal(len(node_2_info), 1) + assert_equal(len(node_3_info), 1) + assert_equal(node_2_info[0]["transport_protocol_type"], "v1") + assert_equal(node_3_info[0]["transport_protocol_type"], "v1") + assert_equal(len(node_2_info[0]["session_id"]), 0) + assert_equal(len(node_3_info[0]["session_id"]), 0) + + # V1 nodes can sync with V2 nodes + self.disconnect_nodes(0, 1) + self.disconnect_nodes(2, 3) + with self.nodes[2].assert_debug_log(expected_msgs=[], + unexpected_msgs=[sending_handshake, downgrading_to_v1]): + self.connect_nodes(2, 1, peer_advertises_v2=False) # cannot enable v2 on v1 node + self.sync_all(self.nodes[1:3]) + assert_equal(self.nodes[1].getblockcount(), 8) + assert self.nodes[0].getbestblockhash() != self.nodes[1].getbestblockhash() + # verify there is a v1 connection between node 1 and 2 + node_1_info = self.nodes[1].getpeerinfo() + node_2_info = self.nodes[2].getpeerinfo() + assert_equal(len(node_1_info), 1) + assert_equal(len(node_2_info), 1) + assert_equal(node_1_info[0]["transport_protocol_type"], "v1") + assert_equal(node_2_info[0]["transport_protocol_type"], "v1") + assert_equal(len(node_1_info[0]["session_id"]), 0) + assert_equal(len(node_2_info[0]["session_id"]), 0) + + # V2 nodes can sync with V1 nodes + self.disconnect_nodes(1, 2) + with self.nodes[0].assert_debug_log(expected_msgs=[], + unexpected_msgs=[sending_handshake, downgrading_to_v1]): + self.connect_nodes(0, 3, peer_advertises_v2=False) + self.sync_all([self.nodes[0], self.nodes[3]]) + assert_equal(self.nodes[0].getblockcount(), 8) + # verify there is a v1 connection between node 0 and 3 + node_0_info = self.nodes[0].getpeerinfo() + node_3_info = self.nodes[3].getpeerinfo() + assert_equal(len(node_0_info), 1) + assert_equal(len(node_3_info), 1) + assert_equal(node_0_info[0]["transport_protocol_type"], "v1") + assert_equal(node_3_info[0]["transport_protocol_type"], "v1") + assert_equal(len(node_0_info[0]["session_id"]), 0) + assert_equal(len(node_3_info[0]["session_id"]), 0) + + # V2 node mines another block and everyone gets it + self.connect_nodes(0, 1, peer_advertises_v2=True) + self.connect_nodes(1, 2, peer_advertises_v2=False) + self.generate(self.nodes[1], 1, sync_fun=lambda: self.sync_all(self.nodes[0:4])) + assert_equal(self.nodes[0].getblockcount(), 9) # sync_all() verifies tip hashes match + + # V1 node mines another block and everyone gets it + self.generate(self.nodes[3], 2, sync_fun=lambda: self.sync_all(self.nodes[0:4])) + assert_equal(self.nodes[2].getblockcount(), 11) # sync_all() verifies tip hashes match + + assert_equal(self.nodes[4].getblockcount(), 0) + # Peer 4 is v1 p2p, but is falsely advertised as v2. + with self.nodes[1].assert_debug_log(expected_msgs=[sending_handshake, downgrading_to_v1]): + self.connect_nodes(1, 4, peer_advertises_v2=True) + self.sync_all() + assert_equal(self.nodes[4].getblockcount(), 11) + + # Check v1 prefix detection + V1_PREFIX = MAGIC_BYTES[self.chain] + b"version\x00\x00\x00\x00\x00" + assert_equal(len(V1_PREFIX), 16) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + num_peers = len(self.nodes[0].getpeerinfo()) + s.connect(("127.0.0.1", p2p_port(0))) + self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == num_peers + 1) + s.sendall(V1_PREFIX[:-1]) + assert_equal(self.nodes[0].getpeerinfo()[-1]["transport_protocol_type"], "detecting") + s.sendall(bytes([V1_PREFIX[-1]])) # send out last prefix byte + self.wait_until(lambda: self.nodes[0].getpeerinfo()[-1]["transport_protocol_type"] == "v1") + + # Check wrong network prefix detection (hits if the next 12 bytes correspond to a v1 version message) + wrong_network_magic_prefix = MAGIC_BYTES["mainnet"] + V1_PREFIX[4:] + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect(("127.0.0.1", p2p_port(0))) + with self.nodes[0].assert_debug_log("V2 transport error: V1 peer with wrong MessageStart"): + s.sendall(wrong_network_magic_prefix + b"somepayload") + + # Check detection of missing garbage terminator (hits after fixed amount of data if terminator never matches garbage) + MAX_KEY_GARB_AND_GARBTERM_LEN = 64 + 4095 + 16 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + num_peers = len(self.nodes[0].getpeerinfo()) + s.connect(("127.0.0.1", p2p_port(0))) + self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == num_peers + 1) + s.sendall(b'\x00' * (MAX_KEY_GARB_AND_GARBTERM_LEN - 1)) + self.wait_until(lambda: self.nodes[0].getpeerinfo()[-1]["bytesrecv"] == MAX_KEY_GARB_AND_GARBTERM_LEN - 1) + with self.nodes[0].assert_debug_log("V2 transport error: missing garbage terminator"): + s.sendall(b'\x00') # send out last byte + # should disconnect immediately + self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == num_peers) + + +if __name__ == '__main__': + V2TransportTest().main() diff --git a/test/functional/rpc_net.py b/test/functional/rpc_net.py index 45da5636d4f3f..e8d26077639e6 100755 --- a/test/functional/rpc_net.py +++ b/test/functional/rpc_net.py @@ -145,11 +145,13 @@ def test_getpeerinfo(self): "relaytxes": False, "services": "0000000000000000", "servicesnames": [], + "session_id": "", "startingheight": -1, "subver": "", "synced_blocks": -1, "synced_headers": -1, "timeoffset": 0, + "transport_protocol_type": "v1" if not self.options.v2transport else "detecting", "version": 0, }, ) @@ -158,19 +160,23 @@ def test_getpeerinfo(self): def test_getnettotals(self): self.log.info("Test getnettotals") # Test getnettotals and getpeerinfo by doing a ping. The bytes - # sent/received should increase by at least the size of one ping (32 - # bytes) and one pong (32 bytes). + # sent/received should increase by at least the size of one ping + # and one pong. Both have a payload size of 8 bytes, but the total + # size depends on the used p2p version: + # - p2p v1: 24 bytes (header) + 8 bytes (payload) = 32 bytes + # - p2p v2: 21 bytes (header/tag with short-id) + 8 bytes (payload) = 29 bytes + ping_size = 32 if not self.options.v2transport else 29 net_totals_before = self.nodes[0].getnettotals() peer_info_before = self.nodes[0].getpeerinfo() self.nodes[0].ping() - self.wait_until(lambda: (self.nodes[0].getnettotals()['totalbytessent'] >= net_totals_before['totalbytessent'] + 32 * 2), timeout=1) - self.wait_until(lambda: (self.nodes[0].getnettotals()['totalbytesrecv'] >= net_totals_before['totalbytesrecv'] + 32 * 2), timeout=1) + self.wait_until(lambda: (self.nodes[0].getnettotals()['totalbytessent'] >= net_totals_before['totalbytessent'] + ping_size * 2), timeout=1) + self.wait_until(lambda: (self.nodes[0].getnettotals()['totalbytesrecv'] >= net_totals_before['totalbytesrecv'] + ping_size * 2), timeout=1) for peer_before in peer_info_before: peer_after = lambda: next(p for p in self.nodes[0].getpeerinfo() if p['id'] == peer_before['id']) - self.wait_until(lambda: peer_after()['bytesrecv_per_msg'].get('pong', 0) >= peer_before['bytesrecv_per_msg'].get('pong', 0) + 32, timeout=1) - self.wait_until(lambda: peer_after()['bytessent_per_msg'].get('ping', 0) >= peer_before['bytessent_per_msg'].get('ping', 0) + 32, timeout=1) + self.wait_until(lambda: peer_after()['bytesrecv_per_msg'].get('pong', 0) >= peer_before['bytesrecv_per_msg'].get('pong', 0) + ping_size, timeout=1) + self.wait_until(lambda: peer_after()['bytessent_per_msg'].get('ping', 0) >= peer_before['bytessent_per_msg'].get('ping', 0) + ping_size, timeout=1) def test_getnetworkinfo(self): self.log.info("Test getnetworkinfo") @@ -346,7 +352,10 @@ def test_sendmsgtopeer(self): node = self.nodes[0] self.restart_node(0) - self.connect_nodes(0, 1) + # we want to use a p2p v1 connection here in order to ensure + # a peer id of zero (a downgrade from v2 to v1 would lead + # to an increase of the peer id) + self.connect_nodes(0, 1, peer_advertises_v2=False) self.log.info("Test sendmsgtopeer") self.log.debug("Send a valid message") diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py index 694d4f580d984..da8345b8818b2 100755 --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -51,6 +51,7 @@ NODE_COMPACT_FILTERS = (1 << 6) NODE_NETWORK_LIMITED = (1 << 10) NODE_HEADERS_COMPRESSED = (1 << 11) +NODE_P2P_V2 = (1 << 12) MSG_TX = 1 MSG_BLOCK = 2 diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 3ca3cc3e34855..3669f7f7cdac3 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -222,6 +222,8 @@ def parse_args(self): parser.add_argument("--randomseed", type=int, help="set a random seed for deterministically reproducing a previous test run") parser.add_argument('--timeout-factor', dest="timeout_factor", type=float, default=1.0, help='adjust test timeouts by a factor. Setting it to 0 disables all timeouts') + parser.add_argument("--v2transport", dest="v2transport", default=False, action="store_true", + help="use BIP324 v2 connections between all nodes by default") group = parser.add_mutually_exclusive_group() group.add_argument("--descriptors", action='store_const', const=True, @@ -545,6 +547,7 @@ def get_bin_from_version(version, bin_name, bin_default): start_perf=self.options.perf, use_valgrind=self.options.valgrind, descriptors=self.options.descriptors, + v2transport=self.options.v2transport, ) self.nodes.append(test_node_i) if not test_node_i.version_is_at_least(160000): @@ -692,7 +695,7 @@ def restart_node(self, i, extra_args=None, expected_stderr=''): def wait_for_node_exit(self, i, timeout): self.nodes[i].process.wait(timeout) - def connect_nodes(self, a, b): + def connect_nodes(self, a, b, *, peer_advertises_v2=None): # A node cannot connect to itself, bail out early if (a == b): return @@ -700,7 +703,16 @@ def connect_nodes(self, a, b): from_connection = self.nodes[a] to_connection = self.nodes[b] ip_port = "127.0.0.1:" + str(p2p_port(b)) - from_connection.addnode(ip_port, "onetry") + + if peer_advertises_v2 is None: + peer_advertises_v2 = from_connection.use_v2transport + + if peer_advertises_v2 != from_connection.use_v2transport: + from_connection.addnode(node=ip_port, command="onetry", v2transport=peer_advertises_v2) + else: + # skip the optional third argument if it matches the default, for + # compatibility with older clients + from_connection.addnode(ip_port, "onetry") # Use subversion as peer id. Test nodes have their node number appended to the user agent string from_connection_subver = from_connection.getnetworkinfo()['subversion'] @@ -721,13 +733,13 @@ def check_bytesrecv(peer, msg_type, min_bytes_recv): assert peer is not None, "Error: peer disconnected" return peer['bytesrecv_per_msg'].pop(msg_type, 0) >= min_bytes_recv - self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'verack', 24)) - self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'verack', 24)) + self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'verack', 21)) + self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'verack', 21)) # The message bytes are counted before processing the message, so make # sure it was fully processed by waiting for a ping. - self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 32)) - self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 32)) + self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 29)) + self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 29)) def disconnect_nodes(self, a, b): # A node cannot disconnect from itself, bail out early @@ -1161,11 +1173,11 @@ def add_nodes(self, num_nodes: int, extra_args=None, *, rpchost=None, binary=Non # controller node is the only node that has an extra option allowing it to submit sporks append_config(self.nodes[0].datadir, ["sporkkey=cP4EKFyJsHT39LDqgdcB43Y3YXjNyjb5Fuas1GQSeAtjnZWmZEQK"]) - def connect_nodes(self, a, b): + def connect_nodes(self, a, b, *, peer_advertises_v2=None): for mn2 in self.mninfo: if mn2.node is not None: mn2.node.setmnthreadactive(False) - super().connect_nodes(a, b) + super().connect_nodes(a, b, peer_advertises_v2=peer_advertises_v2) for mn2 in self.mninfo: if mn2.node is not None: mn2.node.setmnthreadactive(True) diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index 7cac7862c9d82..6aa1bbd844870 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -65,7 +65,7 @@ class TestNode(): To make things easier for the test writer, any unrecognised messages will be dispatched to the RPC connection.""" - def __init__(self, i, datadir, extra_args_from_options, *, chain, rpchost, timewait, timeout_factor, bitcoind, bitcoin_cli, mocktime, coverage_dir, cwd, extra_conf=None, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, descriptors=False): + def __init__(self, i, datadir, extra_args_from_options, *, chain, rpchost, timewait, timeout_factor, bitcoind, bitcoin_cli, mocktime, coverage_dir, cwd, extra_conf=None, extra_args=None, use_cli=False, start_perf=False, use_valgrind=False, version=None, descriptors=False, v2transport=False): """ Kwargs: start_perf (bool): If True, begin profiling the node with `perf` as soon as @@ -124,6 +124,12 @@ def __init__(self, i, datadir, extra_args_from_options, *, chain, rpchost, timew if self.version_is_at_least(21000000): self.args.append("-logsourcelocations") + # Default behavior from global -v2transport flag is added to args to persist it over restarts. + # May be overwritten in individual tests, using extra_args. + self.default_to_v2 = v2transport + if self.default_to_v2: + self.args.append("-v2transport=1") + self.cli = TestNodeCLI(bitcoin_cli, self.datadir) self.use_cli = use_cli self.start_perf = start_perf @@ -205,6 +211,8 @@ def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, **kwargs if extra_args is None: extra_args = self.extra_args + self.use_v2transport = "-v2transport=1" in extra_args or (self.default_to_v2 and "-v2transport=0" not in extra_args) + # Add a new stdout and stderr file each time dashd is started if stderr is None: stderr = tempfile.NamedTemporaryFile(dir=self.stderr_dir, delete=False) @@ -732,15 +740,15 @@ def batch(self, requests): results.append(dict(error=e)) return results - def send_cli(self, command=None, *args, **kwargs): + def send_cli(self, clicommand=None, *args, **kwargs): """Run dash-cli command. Deserializes returned string as python object.""" pos_args = [arg_to_cli(arg) for arg in args] named_args = [str(key) + "=" + arg_to_cli(value) for (key, value) in kwargs.items()] p_args = [self.binary, "-datadir=" + self.datadir] + self.options if named_args: p_args += ["-named"] - if command is not None: - p_args += [command] + if clicommand is not None: + p_args += [clicommand] p_args += pos_args + named_args self.log.debug("Running dash-cli {}".format(p_args[2:])) process = subprocess.Popen(p_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index bfbfe54ba4c97..d118bf6fa7663 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -114,6 +114,7 @@ 'wallet_labels.py --legacy-wallet', 'wallet_labels.py --descriptors', 'p2p_timeouts.py', + 'p2p_timeouts.py --v2transport', 'feature_bip68_sequence.py', 'mempool_updatefromblock.py', 'p2p_tx_download.py', @@ -175,6 +176,7 @@ 'mempool_reorg.py', 'mempool_persist.py', 'p2p_block_sync.py', + 'p2p_block_sync.py --v2transport', 'wallet_multiwallet.py --legacy-wallet', 'wallet_multiwallet.py --descriptors', 'wallet_multiwallet.py --usecli', @@ -204,12 +206,15 @@ 'wallet_groups.py --legacy-wallet', 'wallet_groups.py --descriptors', 'p2p_compactblocks_hb.py', + 'p2p_compactblocks_hb.py --v2transport', 'p2p_disconnect_ban.py', + 'p2p_disconnect_ban.py --v2transport', 'feature_addressindex.py', 'feature_timestampindex.py', 'feature_spentindex.py', 'rpc_decodescript.py', 'rpc_blockchain.py', + 'rpc_blockchain.py --v2transport', 'rpc_deprecated.py', 'wallet_disable.py --legacy-wallet', 'wallet_disable.py --descriptors', @@ -220,6 +225,7 @@ 'p2p_getdata.py', 'p2p_addrfetch.py', 'rpc_net.py', + 'rpc_net.py --v2transport', 'wallet_keypool.py --legacy-wallet', 'wallet_keypool_hd.py --legacy-wallet', 'wallet_keypool_hd.py --descriptors', @@ -231,8 +237,11 @@ 'mining_prioritisetransaction.py', 'p2p_invalid_locator.py', 'p2p_invalid_block.py', + 'p2p_invalid_block.py --v2transport', 'p2p_invalid_messages.py', 'p2p_invalid_tx.py', + 'p2p_invalid_tx.py --v2transport', + 'p2p_v2_transport.py', 'feature_assumevalid.py', 'example_test.py', 'wallet_txn_doublespend.py --legacy-wallet', @@ -257,9 +266,12 @@ 'wallet_importprunedfunds.py --legacy-wallet', 'wallet_importprunedfunds.py --descriptors', 'p2p_leak_tx.py', + 'p2p_leak_tx.py --v2transport', 'p2p_eviction.py', 'p2p_ibd_stalling.py', + 'p2p_ibd_stalling.py --v2transport', 'p2p_net_deadlock.py', + 'p2p_net_deadlock.py --v2transport', 'rpc_signmessage.py', 'rpc_generateblock.py', 'rpc_generate.py', @@ -350,6 +362,7 @@ 'feature_coinstatsindex.py', 'wallet_orphanedreward.py', 'p2p_node_network_limited.py', + 'p2p_node_network_limited.py --v2transport', 'p2p_permissions.py', 'feature_blocksdir.py', 'wallet_startup.py',