diff --git a/net/Directory.Packages.props b/net/Directory.Packages.props index a1ecdad7..9861a6a4 100644 --- a/net/Directory.Packages.props +++ b/net/Directory.Packages.props @@ -12,6 +12,7 @@ + diff --git a/net/src/Substrate.Gear.Client/BlockHeadersStream.cs b/net/src/Substrate.Gear.Client/BlockHeadersStream.cs new file mode 100644 index 00000000..1476753f --- /dev/null +++ b/net/src/Substrate.Gear.Client/BlockHeadersStream.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using EnsureThat; +using Substrate.Gear.Api.Generated; +using Substrate.NetApi.Model.Rpc; + +namespace Substrate.Gear.Client; + +public sealed class BlockHeadersStream : IAsyncDisposable +{ + internal static async Task CreateAsync( + SubstrateClientExt nodeClient, + Func, Task> subscribe, + Func unsubscribe) + { + EnsureArg.IsNotNull(nodeClient, nameof(nodeClient)); + EnsureArg.IsNotNull(subscribe, nameof(subscribe)); + EnsureArg.IsNotNull(unsubscribe, nameof(unsubscribe)); + + var channel = Channel.CreateUnbounded
( + new UnboundedChannelOptions + { + SingleReader = true + }); + + var subscriptionId = await subscribe( + nodeClient, + (_, blockHeader) => channel.Writer.TryWrite(blockHeader)) + .ConfigureAwait(false); + + return new BlockHeadersStream( + channel, + () => unsubscribe(nodeClient, subscriptionId)); + } + + private BlockHeadersStream(Channel
channel, Func unsubscribe) + { + this.channel = channel; + this.unsubscribe = unsubscribe; + this.isReadInProgress = 0; + } + + private readonly Channel
channel; + private readonly Func unsubscribe; + private int isReadInProgress; + + public async ValueTask DisposeAsync() + { + await this.unsubscribe().ConfigureAwait(false); + this.channel.Writer.Complete(); + + GC.SuppressFinalize(this); + } + + /// + /// Returns all finalized block headers since the stream was created. + /// Only one read operation is allowed at a time. + /// + /// + /// + public IAsyncEnumerable
ReadAllAsync(CancellationToken cancellationToken) + { + return Interlocked.CompareExchange(ref this.isReadInProgress, 1, 0) == 0 + ? ReadAllImpl(cancellationToken) + : throw new InvalidOperationException("TODO: Custom exception. Only one read operation is allowed at a time."); + + async IAsyncEnumerable
ReadAllImpl([EnumeratorCancellation] CancellationToken cancellationToken) + { + try + { + while (true) + { + yield return await this.channel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + } + } + finally + { + Interlocked.Exchange(ref this.isReadInProgress, 0); + } + } + } +} diff --git a/net/src/Substrate.Gear.Client/Substrate.Gear.Client.csproj b/net/src/Substrate.Gear.Client/Substrate.Gear.Client.csproj index c280cbe7..a8713d61 100644 --- a/net/src/Substrate.Gear.Client/Substrate.Gear.Client.csproj +++ b/net/src/Substrate.Gear.Client/Substrate.Gear.Client.csproj @@ -10,6 +10,7 @@ all runtime; build; native; contentfiles; analyzers + diff --git a/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs b/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs index 557baf9d..a88afd53 100644 --- a/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs +++ b/net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs @@ -141,6 +141,26 @@ public static async Task ListBlockEventsAsync( .ConfigureAwait(false); } + /// + /// Subscribes to finalized block headers and returns them as a stream which can be read as an async enumerable. + /// + /// + /// + /// + public static Task GetFinalizedBlockHeadersStreamAsync( + this SubstrateClientExt nodeClient, + CancellationToken cancellationToken) + { + EnsureArg.IsNotNull(nodeClient, nameof(nodeClient)); + + return BlockHeadersStream.CreateAsync( + nodeClient, + (nodeClient, callback) => + nodeClient.Chain.SubscribeFinalizedHeadsAsync(callback, cancellationToken), + (nodeClient, subscriptionId) => + nodeClient.Chain.UnsubscribeFinalizedHeadsAsync(subscriptionId, CancellationToken.None)); + } + /// /// Calculates amount of gas required for creating a new program from previously uploaded code. ///