From 5369d7ac3c2c040955225c496f05f747de0d3c04 Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Wed, 6 Mar 2024 16:16:06 +0900 Subject: [PATCH] Fix: NBX would sometimes stays stuck in case of reorg (Fix #461 #409) --- NBXplorer.Tests/UnitTest1.cs | 30 ++++ .../Backends/Postgres/PostgresIndexers.cs | 150 ++++++++---------- 2 files changed, 100 insertions(+), 80 deletions(-) diff --git a/NBXplorer.Tests/UnitTest1.cs b/NBXplorer.Tests/UnitTest1.cs index 8a5d7c88..92e1f3cc 100644 --- a/NBXplorer.Tests/UnitTest1.cs +++ b/NBXplorer.Tests/UnitTest1.cs @@ -4501,5 +4501,35 @@ public async Task CanUseRPCProxy(Backend backend) await tester.Client.RPCClient.GetTxOutAsync(uint256.One, 0); } } + + + [Fact] + public async Task DoNotHangDuringReorg() + { + using var tester = ServerTester.Create(Backend.Postgres); + var wallet = await tester.Client.GenerateWalletAsync(new GenerateWalletRequest()); + var addr = await tester.Client.GetUnusedAsync(wallet.DerivationScheme, DerivationFeature.Deposit); + var txId = tester.SendToAddress(addr.Address, Money.Coins(1.0m)); + tester.Notifications.WaitForTransaction(wallet.DerivationScheme, txId); + var blocks = await tester.RPC.GenerateAsync(4); + for (int i = 0; i < blocks.Length; i++) + { + Logs.Tester.LogInformation($"Chain1: [{i}]: {blocks[i]}"); + } + tester.Notifications.WaitForBlocks(blocks[^1]); + Logs.Tester.LogInformation("Invalidate the first block which confirmed the transaction " + blocks[0]); + tester.RPC.InvalidateBlock(blocks[0]); + var blocks2 = await tester.RPC.GenerateAsync(3); + for (int i = 0; i < blocks2.Length; i++) + { + Logs.Tester.LogInformation($"Chain2: [{i}]: {blocks2[i]}"); + } + tester.Notifications.WaitForBlocks(blocks2[^1]); + Logs.Tester.LogInformation("Reconsider the block " + blocks[0]); + tester.RPC.SendCommand("reconsiderblock", blocks[0]); + + Logs.Tester.LogInformation($"Waiting for the first chain to be processed again"); + tester.Notifications.WaitForBlocks(blocks[^1]); + } } } diff --git a/NBXplorer/Backends/Postgres/PostgresIndexers.cs b/NBXplorer/Backends/Postgres/PostgresIndexers.cs index a5bfc94b..b1fa22e6 100644 --- a/NBXplorer/Backends/Postgres/PostgresIndexers.cs +++ b/NBXplorer/Backends/Postgres/PostgresIndexers.cs @@ -46,9 +46,6 @@ public PostgresIndexer( CancellationTokenSource cts; Task _indexerLoop; Task _watchdogLoop; - Node _Node; - Channel _Channel; - Channel _DownloadedBlocks; // This one will check if the indexer is "stuck" and disconnect the node if it is the case async Task WatchdogLoop() @@ -58,14 +55,14 @@ async Task WatchdogLoop() try { await Task.Delay(TimeSpan.FromMinutes(5.0), cancellationToken); - var height = await SeemsStuck(cancellationToken); - if (height is null) + var lastBlock = await SeemsStuck(cancellationToken); + if (lastBlock is null) goto wait; await Task.Delay(TimeSpan.FromMinutes(2.0), cancellationToken); - var height2 = await SeemsStuck(cancellationToken); - if (height != height2) + var lastBlock2 = await SeemsStuck(cancellationToken); + if (lastBlock != lastBlock2) goto wait; - _Node?.DisconnectAsync($"Sync seems stuck at height {height.Value}, restarting the connection."); + _Connection?.Dispose($"Sync seems stuck after block {lastBlock.Hash} ({lastBlock.Hash}), restarting the connection."); goto wait; } catch when (cts.Token.IsCancellationRequested) @@ -80,18 +77,17 @@ async Task WatchdogLoop() end:; } - async Task SeemsStuck(CancellationToken cancellationToken) + async Task SeemsStuck(CancellationToken cancellationToken) { if (State is not (BitcoinDWaiterState.NBXplorerSynching or BitcoinDWaiterState.Ready) || - SyncHeight is not long syncHeight || + lastIndexedBlock is not { } lastBlock || GetConnectedClient() is not RPCClient rpc) { return null; } + var blockchainInfo = await rpc.GetBlockchainInfoAsyncEx(cancellationToken); - if (Math.Min(blockchainInfo.Headers, blockchainInfo.Blocks) > syncHeight) - return syncHeight; - return null; + return blockchainInfo.BestBlockHash != lastBlock.Hash ? lastBlock : null; } async Task IndexerLoop() @@ -121,24 +117,53 @@ async Task IndexerLoop() } } + class Connection : IDisposable + { + public Channel Events; + public Channel Blocks; + public Node Node; + public Connection(Node node) + { + Node = node; + Events = Channel.CreateUnbounded(new() { AllowSynchronousContinuations = false }); + Blocks = Channel.CreateUnbounded(new() { AllowSynchronousContinuations = false }); + } + bool _Disposed = false; + + public void Dispose() + { + Dispose(null); + } + public void Dispose(string reason) + { + if (_Disposed) + return; + Node.DisconnectAsync(reason); + Events.Writer.TryComplete(); + Blocks.Writer.TryComplete(); + _Disposed = true; + } + } + Connection _Connection; private async Task IndexerLoopCore(CancellationToken token) { - await ConnectNode(token, true); - await foreach (var item in _Channel.Reader.ReadAllAsync(token)) + await ConnectNode(token); + var connection = _Connection; + await foreach (var item in connection.Events.Reader.ReadAllAsync(token)) { await using var conn = await ConnectionFactory.CreateConnectionHelper(Network); if (item is PullBlocks pb) { - var headers = ConsolidatePullBlocks(_Channel.Reader, pb); + var headers = ConsolidatePullBlocks(connection.Events.Reader, pb); foreach (var batch in headers.Chunk(maxinflight)) { - _ = _Node.SendMessageAsync( + _ = connection.Node.SendMessageAsync( new GetDataPayload( - batch.Select(b => new InventoryVector(_Node.AddSupportedOptions(InventoryType.MSG_BLOCK), b.GetHash()) + batch.Select(b => new InventoryVector(connection.Node.AddSupportedOptions(InventoryType.MSG_BLOCK), b.GetHash()) ).ToArray())); var remaining = batch.Select(b => b.GetHash()).ToHashSet(); List unorderedBlocks = new List(); - await foreach (var block in _DownloadedBlocks.Reader.ReadAllAsync(token)) + await foreach (var block in connection.Blocks.Reader.ReadAllAsync(token)) { if (!remaining.Remove(block.Header.GetHash())) continue; @@ -188,17 +213,14 @@ private async Task IndexerLoopCore(CancellationToken token) } } await SaveProgress(conn); - await UpdateState(); + await UpdateState(connection.Node); } - await AskNextHeaders(token); - } - if (item is NodeDisconnected) - { - await ConnectNode(token, false); + if (connection.Node.State != NodeState.HandShaked) + await AskNextHeaders(connection.Node, token); } if (item is Transaction tx) { - var txs = PullTransactions(_Channel.Reader, tx); + var txs = PullTransactions(connection.Events.Reader, tx); await SaveMatches(conn, txs, null, true); } } @@ -255,15 +277,8 @@ private IList ConsolidatePullBlocks(ChannelReader reader, P } - private async Task ConnectNode(CancellationToken token, bool forceRestart) + private async Task ConnectNode(CancellationToken token) { - if (_Node is not null) - { - if (!forceRestart && _Node.State == NodeState.HandShaked) - return; - _Node.DisconnectAsync("Restarting"); - _Node = null; - } State = BitcoinDWaiterState.NotStarted; using (var handshakeTimeout = CancellationTokenSource.CreateLinkedTokenSource(token)) { @@ -344,35 +359,35 @@ private async Task ConnectNode(CancellationToken token, bool forceRestart) State = BitcoinDWaiterState.NBXplorerSynching; // Refresh the NetworkInfo that may have become different while it was synching. NetworkInfo = await RPCClient.GetNetworkInfoAsync(); - _Node = node; - _Channel?.Writer.Complete(); - _Channel = Channel.CreateUnbounded(); - _DownloadedBlocks?.Writer.Complete(); - _DownloadedBlocks = Channel.CreateUnbounded(); + + _Connection?.Dispose("Creating new connection"); + _Connection = new Connection(node); node.MessageReceived += Node_MessageReceived; node.Disconnected += Node_Disconnected; - - var locator = await AskNextHeaders(token); + var locator = await AskNextHeaders(node, token); lastIndexedBlock = await Repository.GetLastIndexedSlimChainedBlock(locator); if (lastIndexedBlock is null) { var locatorTip = await RPCClient.GetBlockHeaderAsyncEx(locator.Blocks[0], token); lastIndexedBlock = locatorTip?.ToSlimChainedBlock(); } - await UpdateState(); + await UpdateState(node); } } - bool firstConnect = true; - private async Task AskNextHeaders(CancellationToken token) + private async Task AskNextHeaders(Node node, CancellationToken token) { var indexProgress = await Repository.GetIndexProgress(); if (indexProgress is null) { indexProgress = await GetDefaultCurrentLocation(token); } - await _Node.SendMessageAsync(new GetHeadersPayload(indexProgress)); + foreach (var block in indexProgress.Blocks) + { + Logger.LogInformation($"Asking for block {block}"); + } + await node.SendMessageAsync(new GetHeadersPayload(indexProgress)); return indexProgress; } @@ -391,8 +406,10 @@ private async Task SaveProgress(DbConnectionHelper conn) await Repository.SetIndexProgress(conn.Connection, locator); } - private async Task UpdateState() + private async Task UpdateState(Node node) { + if (node.State != NodeState.HandShaked) + return; var blockchainInfo = await RPCClient.GetBlockchainInfoAsyncEx(); if (blockchainInfo.IsSynching(Network)) { @@ -508,18 +525,16 @@ private async Task SaveMatches(DbConnectionHelper conn, List transa SlimChainedBlock lastIndexedBlock; record PullBlocks(IList headers); - record NodeDisconnected(); private void Node_MessageReceived(Node node, IncomingMessage message) { - var channel = _Channel; - var downloadedBlocks = _DownloadedBlocks; + var connection = _Connection; if (message.Message.Payload is HeadersPayload h && h.Headers.Count != 0) { - channel.Writer.TryWrite(new PullBlocks(h.Headers)); + connection.Events.Writer.TryWrite(new PullBlocks(h.Headers)); } else if (message.Message.Payload is BlockPayload b) { - downloadedBlocks.Writer.TryWrite(b.Object); + connection.Blocks.Writer.TryWrite(b.Object); } else if (message.Message.Payload is InvPayload invs) { @@ -535,41 +550,17 @@ private void Node_MessageReceived(Node node, IncomingMessage message) { node.SendMessageAsync(data); } - // DOGE coin doing doge things forget we want header first sync... reboot the connection - else - { - if (invs.Inventory.Where(t => t.Type.HasFlag(InventoryType.MSG_BLOCK)).Any()) - { - node.DisconnectAsync("Not sending headers first anymore"); - } - } } else if (message.Message.Payload is TxPayload tx) { - channel.Writer.TryWrite(tx.Object); + connection.Events.Writer.TryWrite(tx.Object); } } private void Node_Disconnected(Node node) { - var channel = _Channel; - if (node.DisconnectReason.Reason != "Restarting") - { - if (!cts.IsCancellationRequested) - { - var exception = node.DisconnectReason.Exception?.Message; - if (!string.IsNullOrEmpty(exception)) - exception = $" ({exception})"; - else - exception = String.Empty; - Logger.LogWarning($"Node disconnected for reason: {node.DisconnectReason.Reason}{exception}"); - } - channel.Writer.TryWrite(new NodeDisconnected()); - } - else - { - Logger.LogInformation($"Restarting node connection..."); - } + Logger.LogInformation($"Node disconnected ({node.DisconnectReason.Reason})"); + _Connection?.Dispose(); node.MessageReceived -= Node_MessageReceived; node.Disconnected -= Node_Disconnected; State = BitcoinDWaiterState.NotStarted; @@ -589,12 +580,11 @@ public async Task StartAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken) { cts?.Cancel(); - _Channel.Writer.Complete(); + _Connection?.Dispose("NBXplorer stopping..."); if (_indexerLoop is not null) await _indexerLoop; if (_watchdogLoop is not null) await _watchdogLoop; - _Node?.DisconnectAsync(); } public NBXplorerNetwork Network => network;