Skip to content

Commit

Permalink
thtrottle all RPC modules
Browse files Browse the repository at this point in the history
  • Loading branch information
MarekM25 committed Jul 21, 2023
1 parent d56d659 commit 516d2a1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 28 deletions.
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Init/Steps/RegisterRpcModules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public virtual async Task Execute(CancellationToken cancellationToken)
_api.GasPriceOracle,
_api.EthSyncingInfo);

RpcLimits.Init(rpcConfig.RequestQueueLimit);
rpcModuleProvider.RegisterBounded(ethModuleFactory, rpcConfig.EthModuleConcurrentInstances ?? Environment.ProcessorCount, rpcConfig.Timeout, _api.LogManager, rpcConfig.RequestQueueLimit);

if (_api.DbProvider is null) throw new StepDependencyException(nameof(_api.DbProvider));
Expand Down
65 changes: 38 additions & 27 deletions src/Nethermind/Nethermind.JsonRpc/Modules/BoundedModulePool.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -11,6 +10,38 @@

namespace Nethermind.JsonRpc.Modules
{
public static class RpcLimits
{
public static void Init(int limit)
{
Limit = limit;
}

private static int Limit { get; set; }
private static bool Enabled => Limit > 0;
private static int _queuedCalls = 0;
public static int QueuedCalls => _queuedCalls;

public static void IncrementQueuedCalls()
{
if (Enabled)
Interlocked.Increment(ref _queuedCalls);
}

public static void DecrementQueuedCalls()
{
if (Enabled)
Interlocked.Decrement(ref _queuedCalls);
}

public static void EnsureLimits()
{
if (Enabled && _queuedCalls > Limit)
{
throw new LimitExceededException($"Unable to start new queued requests. Too many queued requests. Queued calls {_queuedCalls}.");
}
}
}
public class BoundedModulePool<T> : IRpcModulePool<T> where T : IRpcModule
{
private readonly int _timeout;
Expand All @@ -19,13 +50,9 @@ public class BoundedModulePool<T> : IRpcModulePool<T> where T : IRpcModule
private readonly ConcurrentQueue<T> _pool = new();
private readonly SemaphoreSlim _semaphore;
private readonly ILogger _logger;
private int _rpcQueuedCalls = 0;
private readonly int _requestQueueLimit = 0;
private bool RequestLimitEnabled => _requestQueueLimit > 0;

public BoundedModulePool(IRpcModuleFactory<T> factory, int exclusiveCapacity, int timeout, ILogManager logManager, int requestQueueLimit = 0)
public BoundedModulePool(IRpcModuleFactory<T> factory, int exclusiveCapacity, int timeout, ILogManager logManager)
{
_requestQueueLimit = requestQueueLimit;
_logger = logManager.GetClassLogger();
_timeout = timeout;
Factory = factory;
Expand All @@ -44,38 +71,22 @@ public BoundedModulePool(IRpcModuleFactory<T> factory, int exclusiveCapacity, in

private async Task<T> SlowPath()
{
if (RequestLimitEnabled && _rpcQueuedCalls > _requestQueueLimit)
{
throw new LimitExceededException($"Unable to start new queued requests for {typeof(T).Name}. Too many queued requests. Queued calls {_rpcQueuedCalls}.");
}

IncrementRpcQueuedCalls();
RpcLimits.EnsureLimits();
RpcLimits.IncrementQueuedCalls();
if (_logger.IsTrace)
_logger.Trace($"{typeof(T).Name} Queued RPC requests {_rpcQueuedCalls}");
_logger.Trace($"{typeof(T).Name} Queued RPC requests {RpcLimits.QueuedCalls}");

if (!await _semaphore.WaitAsync(_timeout))
{
DecrementRpcQueuedCalls();
RpcLimits.DecrementQueuedCalls();
throw new ModuleRentalTimeoutException($"Unable to rent an instance of {typeof(T).Name}. Too many concurrent requests.");
}

DecrementRpcQueuedCalls();
RpcLimits.DecrementQueuedCalls();
_pool.TryDequeue(out T result);
return result;
}

private void IncrementRpcQueuedCalls()
{
if (RequestLimitEnabled)
Interlocked.Increment(ref _rpcQueuedCalls);
}

private void DecrementRpcQueuedCalls()
{
if (RequestLimitEnabled)
Interlocked.Decrement(ref _rpcQueuedCalls);
}

public void ReturnModule(T module)
{
if (ReferenceEquals(module, _shared))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static void RegisterBounded<T>(
int requestQueueLimit = 0)
where T : IRpcModule
{
rpcModuleProvider.Register(new BoundedModulePool<T>(factory, maxCount, timeout, logManager, requestQueueLimit));
rpcModuleProvider.Register(new BoundedModulePool<T>(factory, maxCount, timeout, logManager));
}

public static void RegisterBoundedByCpuCount<T>(
Expand Down

0 comments on commit 516d2a1

Please sign in to comment.