Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): introduce subsription to finalized block headers #619

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions net/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<PackageVersion Include="Roslynator.Analyzers" Version="4.12.8" />
<PackageVersion Include="Roslynator.Formatting.Analyzers" Version="4.12.8" />
<PackageVersion Include="Substrate.NET.API" Version="0.9.24-rc6" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
</ItemGroup>

</Project>
86 changes: 86 additions & 0 deletions net/src/Substrate.Gear.Client/BlockHeadersStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EnsureThat;
using Substrate.Gear.Api.Generated;
using Substrate.NetApi.Model.Rpc;

namespace Substrate.Gear.Client;

public sealed class BlockHeadersStream : IAsyncDisposable
{
internal static async Task<BlockHeadersStream> CreateAsync(
SubstrateClientExt nodeClient,
Func<SubstrateClientExt, Action<string, Header>, Task<string>> subscribe,
Func<SubstrateClientExt, string, Task> unsubscribe)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));
EnsureArg.IsNotNull(subscribe, nameof(subscribe));
EnsureArg.IsNotNull(unsubscribe, nameof(unsubscribe));

var channel = Channel.CreateUnbounded<Header>(
new UnboundedChannelOptions
{
SingleReader = true
});

var subscriptionId = await subscribe(
nodeClient,
(_, blockHeader) => channel.Writer.TryWrite(blockHeader))
.ConfigureAwait(false);

return new BlockHeadersStream(
channel,
() => unsubscribe(nodeClient, subscriptionId));
}

private BlockHeadersStream(Channel<Header> channel, Func<Task> unsubscribe)
{
this.channel = channel;
this.unsubscribe = unsubscribe;
this.isReadInProgress = 0;
}

private readonly Channel<Header> channel;
private readonly Func<Task> unsubscribe;
private int isReadInProgress;

public async ValueTask DisposeAsync()
{
await this.unsubscribe().ConfigureAwait(false);
this.channel.Writer.Complete();

GC.SuppressFinalize(this);
}

/// <summary>
/// Returns all finalized block headers since the stream was created.
/// Only one read operation is allowed at a time.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public IAsyncEnumerable<Header> ReadAllAsync(CancellationToken cancellationToken)
{
return Interlocked.CompareExchange(ref this.isReadInProgress, 1, 0) == 0
? ReadAllImpl(cancellationToken)
: throw new InvalidOperationException("TODO: Custom exception. Only one read operation is allowed at a time.");

async IAsyncEnumerable<Header> ReadAllImpl([EnumeratorCancellation] CancellationToken cancellationToken)
{
try
{
while (true)
{
yield return await this.channel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
}
finally
{
Interlocked.Exchange(ref this.isReadInProgress, 0);
}
}
}
}
1 change: 1 addition & 0 deletions net/src/Substrate.Gear.Client/Substrate.Gear.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="System.Threading.Channels" />
</ItemGroup>

<ItemGroup>
Expand Down
20 changes: 20 additions & 0 deletions net/src/Substrate.Gear.Client/SubstrateClientExtExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ public static async Task<EventRecord[]> ListBlockEventsAsync(
.ConfigureAwait(false);
}

/// <summary>
/// Subscribes to finalized block headers and returns them as a stream which can be read as an async enumerable.
/// </summary>
/// <param name="nodeClient"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task<BlockHeadersStream> GetFinalizedBlockHeadersStreamAsync(
this SubstrateClientExt nodeClient,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(nodeClient, nameof(nodeClient));

return BlockHeadersStream.CreateAsync(
nodeClient,
(nodeClient, callback) =>
nodeClient.Chain.SubscribeFinalizedHeadsAsync(callback, cancellationToken),
(nodeClient, subscriptionId) =>
nodeClient.Chain.UnsubscribeFinalizedHeadsAsync(subscriptionId, CancellationToken.None));
}

/// <summary>
/// Calculates amount of gas required for creating a new program from previously uploaded code.
/// </summary>
Expand Down