Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: multiple subscriptions in DataQueueHandler #32

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c98f335
feat: multiple subscriptions on a lot of symbols
Romazes Oct 15, 2024
4d9059b
refactor: multiple subscription (more than 100 symbols) in DQH
Romazes Oct 15, 2024
01177a6
refactor: use ManualResetEvent instead of AutoResetEvent
Romazes Oct 15, 2024
de5f3a3
refactor: handle multiple tasks waiting process
Romazes Oct 15, 2024
90b350c
rename: ResetEvent variable
Romazes Oct 15, 2024
546523e
test:refactor: subscribe on a lot of Equities
Romazes Oct 15, 2024
9189094
refactor: multiple subscription process
Romazes Oct 16, 2024
dbf3740
fix: OrderType and SecurityType table in Readme
Romazes Oct 16, 2024
a9a4c31
fix: pending task before start new one in StreamingTaskManager
Romazes Oct 16, 2024
67c6a62
refactor: increase some iteration process in TSMultiSubscription
Romazes Oct 16, 2024
03e7fbf
test:feat: add description to MultipleSubscription Test
Romazes Oct 16, 2024
314523c
feat: implement IDisposable in TradeStationApiClient
Romazes Oct 17, 2024
b0735ff
refactor: RestartStreaming internally in StreamingTaskManager
Romazes Oct 17, 2024
9b8ea76
refactor: Unsubscribe process in MultiSubscriptionManager
Romazes Oct 17, 2024
5d8f57d
feat: reconnection of StreamingTaskManager
Romazes Oct 17, 2024
26281cf
refactor: disconnect process from StreamTask
Romazes Oct 17, 2024
c57e69e
refactor: reconnection and delay in StreamingTaskManager
Romazes Oct 17, 2024
8a88f5f
test:feat: multiple subscription on different amount of Symbols
Romazes Oct 17, 2024
f186153
refactor: not start task if subscription is empty in RemoveSubscripti…
Romazes Oct 17, 2024
239392a
remove: extra lock object in StreamingTaskManager
Romazes Oct 18, 2024
4af4b17
remove: duplication create method
Romazes Oct 18, 2024
3ae27d5
remove: extra disposing of list
Romazes Oct 18, 2024
2fb7dd5
refactor: RestartStreaming in StreamingTaskManager
Romazes Oct 18, 2024
2102764
feat: missed cancellation token for task in StreamingTaskManager
Romazes Oct 18, 2024
aece5dc
fix: typo in ex msg
Romazes Oct 18, 2024
d68bb6f
refactor: CancellationToken in StreamingTaskManager
Romazes Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,23 @@ public void StreamsData(Symbol symbol, Resolution resolution)
cancelationToken.Cancel();
}

[TestCase(101)]
public void MultipleSubscription(int subscribeAmount)
[TestCase(105, 2)]
[TestCase(300, 100)]
public void MultipleSubscription(int initSubscribeAmount, int unSubscribeAmount)
{
var lockObject = new object();
var resetEvent = new ManualResetEvent(false);
var cancelationToken = new CancellationTokenSource();
var amountDataBySymbol = new ConcurrentDictionary<Symbol, int>();
var configBySymbol = new Dictionary<Symbol, (List<SubscriptionDataConfig> Configs, CancellationTokenSource CancellationTokenSource)>();

foreach (var symbol in _equitySymbols.Value.Take(subscribeAmount))
var equitySymbols = _equitySymbols.Value.Take(initSubscribeAmount);

Log.Trace("");
Log.Trace($"SUBSCRIBE 105 UNSUBSCRIBE 2 THEN 100 TEST");
Log.Trace("");

foreach (var symbol in equitySymbols)
{
foreach (var config in GetSubscriptionDataConfigsBySymbolResolution(symbol, Resolution.Tick))
{
Expand Down Expand Up @@ -153,7 +160,7 @@ public void MultipleSubscription(int subscribeAmount)

resetEvent.WaitOne(TimeSpan.FromSeconds(30), cancelationToken.Token);

foreach (var configs in configBySymbol.Values.Take(subscribeAmount / 2))
foreach (var configs in configBySymbol.Values.TakeLast(unSubscribeAmount))
{
foreach (var config in configs.Configs)
{
Expand Down
11 changes: 9 additions & 2 deletions QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
using System.Threading.Tasks;
using System.Collections.Generic;
using Lean = QuantConnect.Orders;
using System.Collections.ObjectModel;
using System.Runtime.CompilerServices;
using QuantConnect.Brokerages.TradeStation.Models;
using QuantConnect.Brokerages.TradeStation.Models.Enums;
Expand All @@ -37,7 +36,7 @@ namespace QuantConnect.Brokerages.TradeStation.Api;
/// <summary>
/// TradeStation api client implementation
/// </summary>
public class TradeStationApiClient
public class TradeStationApiClient : IDisposable
{
/// <summary>
/// Maximum number of bars that can be requested in a single call to <see cref="GetBars(string, TradeStationUnitTimeIntervalType, DateTime, DateTime)"/>.
Expand Down Expand Up @@ -717,4 +716,12 @@ private async IAsyncEnumerable<string> StreamRequestAsyncEnumerable(string reque
}
}
}

/// <summary>
/// Releases the resources used by the current instance.
/// </summary>
public void Dispose()
{
_httpClient.DisposeSafely();
}
}
248 changes: 248 additions & 0 deletions QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Linq;
using System.Threading;
using QuantConnect.Util;
using QuantConnect.Logging;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace QuantConnect.Brokerages.TradeStation.Streaming;

/// <summary>
/// Manages streaming tasks for a collection of items, allowing for subscription, unSubscription and restarting of streaming processes.
/// </summary>
public class StreamingTaskManager : IDisposable
{
/// <summary>
/// The maximum number of symbols allowed per quote stream request.
/// </summary>
/// <see href="https://api.tradestation.com/docs/specification#tag/MarketData/operation/GetQuoteChangeStream"/>
private const int MaxSymbolsPerQuoteStreamRequest = 100;

/// <summary>
/// Indicates whether there are any pending subscription processes.
/// </summary>
private bool _hasPendingSubscriptions;

/// <summary>
/// Signals to a <see cref="CancellationToken"/> that it should be canceled.
/// </summary>
private CancellationTokenSource _cancellationTokenSource = new();

/// <summary>
/// The task representing the ongoing streaming operation.
/// </summary>
private Task _streamingTask;

/// <summary>
/// Synchronization object used to ensure thread safety when starting or restarting the streaming task.
/// </summary>
private readonly object _streamingTaskLock = new();

/// <summary>
/// Specifies the delay interval between subscription attempts.
/// </summary>
private readonly TimeSpan _subscribeDelay = TimeSpan.FromMilliseconds(1000);

/// <summary>
/// The action to execute for streaming the subscribed items.
/// </summary>
private readonly Func<IReadOnlyCollection<string>, CancellationToken, Task<bool>> _streamAction;

/// <summary>
/// Gets the collection of subscribed items.
/// </summary>
private readonly HashSet<string> _subscriptionBrokerageTickers = new();

/// <summary>
/// Indicates whether there are no subscribed brokerage tickers.
/// </summary>
public bool IsSubscriptionBrokerageTickerEmpty
{
get
{
lock (_streamingTaskLock)
{
return _subscriptionBrokerageTickers.Count == 0;
}
}
}

/// <summary>
/// Indicates whether the maximum number of subscribed brokerage tickers has been reached.
/// </summary>
public bool IsSubscriptionFilled
{
get
{
lock (_streamingTaskLock)
{
return _subscriptionBrokerageTickers.Count >= MaxSymbolsPerQuoteStreamRequest;
}
}
}

/// <summary>
/// Initializes a new instance of the <see cref="StreamingTaskManager"/> class.
/// </summary>
/// <param name="streamingAction">The action to execute for streaming the items.</param>
public StreamingTaskManager(Func<IReadOnlyCollection<string>, CancellationToken, Task<bool>> streamingAction)
{
_streamAction = streamingAction ?? throw new ArgumentNullException(nameof(streamingAction), "Streaming action cannot be null.");
}

/// <summary>
/// Adds an item to the subscription list if the maximum limit is not reached.
/// If the item is already present, it will not be added, and the method will return false.
/// </summary>
/// <param name="item">The item to add to the subscription list. This should be a unique identifier
/// for the item being subscribed to.</param>
/// <returns><c>true</c> if the item was added successfully; otherwise, <c>false</c>.</returns>
public bool AddSubscriptionItem(string item)
{
lock (_streamingTaskLock)
{
if (_subscriptionBrokerageTickers.Count >= MaxSymbolsPerQuoteStreamRequest)
{
Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Cannot add more items. Maximum limit reached.");
return false;
}

if (!_subscriptionBrokerageTickers.Add(item))
{
Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Item already exists in the list.");
return true;
}
}

RestartStreaming();

return true;
}

/// <summary>
/// Removes an item from the subscription list.
/// </summary>
/// <param name="item">The item to remove from the subscription list.</param>
/// <returns><c>true</c> if the item was removed successfully; otherwise, <c>false</c>.</returns>
public bool RemoveSubscriptionItem(string item)
{
lock (_streamingTaskLock)
{
if (_subscriptionBrokerageTickers.Remove(item))
{
RestartStreaming();
return true;
}
}
Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(RemoveSubscriptionItem)}: Cannot remove item: [{item}]. Item not found.");
return false;
}

/// <summary>
/// Starts the streaming task and executes the provided streaming action.
/// </summary>
private void RestartStreaming()
{
try
{
lock (_streamingTaskLock)
{
if (_hasPendingSubscriptions)
{
// Avoid duplicate subscriptions by checking if a subscription is already in progress
return;
}
_hasPendingSubscriptions = true;
}

if (_streamingTask != null)
{
_cancellationTokenSource.Cancel();
if (!_streamingTask.Wait(TimeSpan.FromSeconds(5)))
{
Log.Error($"{nameof(StreamingTaskManager)}.{nameof(RestartStreaming)}: Timeout while waiting for the streaming task to complete.");
}
_streamingTask = null;
}

var newCancellationTokenSource = _cancellationTokenSource = new CancellationTokenSource();
_streamingTask = Task.Factory.StartNew(async () =>
{
// Wait for a specified delay to batch multiple symbol subscriptions into a single request
if (newCancellationTokenSource.Token.WaitHandle.WaitOne(_subscribeDelay))
{
return;
}

var brokerageTickers = default(List<string>);
lock (_streamingTaskLock)
{
_hasPendingSubscriptions = false;
if (IsSubscriptionBrokerageTickerEmpty)
{
// If there are no symbols to subscribe to, exit the task
Log.Trace($"{nameof(StreamingTaskManager)}.{nameof(RestartStreaming)}: No symbols to subscribe to at this time. Exiting subscription task.");
return;
}
brokerageTickers = _subscriptionBrokerageTickers.ToList();
}

do
{
try
{
var result = await _streamAction(brokerageTickers, newCancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
// Safely skipping:
// Task was cancelled, likely due to token cancellation (e.g., retry attempts or HttpClient.Timeout of 100 seconds).
}
catch (Exception ex)
{
Log.Error($"{nameof(StreamingTaskManager)}.Exception stream action: {ex}");
}
} while (!newCancellationTokenSource.IsCancellationRequested && !newCancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)));


newCancellationTokenSource.DisposeSafely();
}, newCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
catch (Exception ex)
{
Log.Error($"{nameof(StreamingTaskManager)}.Exception wrapper: {ex}");
}
}

/// <summary>
/// Releases the resources used by the current instance.
/// </summary>
public void Dispose()
{
if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}

if (_streamingTask != null && _streamingTask.Status == TaskStatus.RanToCompletion)
{
_streamingTask?.DisposeSafely();
}
}
}
Loading
Loading