From c98f335405bea64f0b31b9c216479fbeb4506a9b Mon Sep 17 00:00:00 2001 From: Romazes Date: Tue, 15 Oct 2024 13:30:54 +0300 Subject: [PATCH 01/26] feat: multiple subscriptions on a lot of symbols --- .../Api/TradeStationApiClient.cs | 1 - .../TradeStationBrokerage.DataQueueHandler.cs | 32 +++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs b/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs index 4223a1b0..d49cd93f 100644 --- a/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs +++ b/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs @@ -26,7 +26,6 @@ using System.Threading.Tasks; using System.Collections.Generic; using Lean = QuantConnect.Orders; -using System.Collections.ObjectModel; using System.Runtime.CompilerServices; using QuantConnect.Brokerages.TradeStation.Models; using QuantConnect.Brokerages.TradeStation.Models.Enums; diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs index 10245432..e19af121 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs @@ -219,14 +219,40 @@ private void SubscribeOnTickUpdateEvents() while (!_streamQuoteCancellationTokenSource.IsCancellationRequested) { + var tasks = new List>(); Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Starting to listen for tick updates..."); try { - // Stream quotes from the TradeStation API and handle each quote event - await foreach (var quote in _tradeStationApiClient.StreamQuotes(brokerageTickers, _streamQuoteCancellationTokenSource.Token)) + var brokerageTickerChunks = brokerageTickers.Chunk(100).ToList(); + for (var i = 0; i < brokerageTickerChunks.Count; i++) { - HandleQuoteEvents(quote); + var taskIndex = i; + Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Starting task for chunk {i}/{brokerageTickerChunks.Count} with {brokerageTickerChunks[i].Length} tickers."); + var res = await Task.Factory.StartNew(async () => + { + // Stream quotes from the TradeStation API and handle each quote event + await foreach (var quote in _tradeStationApiClient.StreamQuotes(brokerageTickerChunks[taskIndex], _streamQuoteCancellationTokenSource.Token)) + { + HandleQuoteEvents(quote); + } + + return false; + }, _streamQuoteCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + + tasks.Add(res); } + + + do + { + var finishedTask = tasks.Any(t => !t.Result); + + if (finishedTask) + { + break; + } + + } while (true); } catch (Exception ex) { From 4d9059b60162bf9b7e259e9fc36c00cd41c2ea8a Mon Sep 17 00:00:00 2001 From: Romazes Date: Tue, 15 Oct 2024 16:42:49 +0300 Subject: [PATCH 02/26] refactor: multiple subscription (more than 100 symbols) in DQH --- .../TradeStationBrokerage.DataQueueHandler.cs | 60 +++++++++++-------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs index e19af121..1e7c5ea3 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs @@ -22,7 +22,6 @@ using QuantConnect.Packets; using QuantConnect.Logging; using System.Threading.Tasks; -using QuantConnect.Securities; using QuantConnect.Interfaces; using QuantConnect.Data.Market; using System.Collections.Generic; @@ -36,6 +35,12 @@ namespace QuantConnect.Brokerages.TradeStation; /// public partial class TradeStationBrokerage : IDataQueueHandler { + /// + /// The maximum number of symbols allowed per quote stream request. + /// + /// + private const int MaxSymbolsPerQuoteStreamRequest = 100; + /// /// Aggregates ticks and bars based on given subscriptions. /// @@ -75,6 +80,11 @@ public partial class TradeStationBrokerage : IDataQueueHandler /// private readonly AutoResetEvent _quoteStreamEndingAutoResetEvent = new(false); + /// + /// Maintains active stream quote tasks when there are more than 100 subscription symbols. + /// + private readonly List> _streamQuotesTasks = new(); + /// /// Indicates whether there are any pending subscription processes. /// @@ -219,45 +229,47 @@ private void SubscribeOnTickUpdateEvents() while (!_streamQuoteCancellationTokenSource.IsCancellationRequested) { - var tasks = new List>(); + _streamQuotesTasks.Clear(); Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Starting to listen for tick updates..."); - try + + var brokerageTickerChunks = brokerageTickers.Chunk(MaxSymbolsPerQuoteStreamRequest).ToList(); + for (var i = 0; i < brokerageTickerChunks.Count; i++) { - var brokerageTickerChunks = brokerageTickers.Chunk(100).ToList(); - for (var i = 0; i < brokerageTickerChunks.Count; i++) + var taskIndex = i; + + var streamQuotesTask = await Task.Factory.StartNew(async () => { - var taskIndex = i; - Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Starting task for chunk {i}/{brokerageTickerChunks.Count} with {brokerageTickerChunks[i].Length} tickers."); - var res = await Task.Factory.StartNew(async () => + Log.Debug($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Starting task for chunk {i + 1}/{brokerageTickerChunks.Count} with {brokerageTickerChunks[i].Length} tickers."); + try { // Stream quotes from the TradeStation API and handle each quote event await foreach (var quote in _tradeStationApiClient.StreamQuotes(brokerageTickerChunks[taskIndex], _streamQuoteCancellationTokenSource.Token)) { HandleQuoteEvents(quote); } - return false; - }, _streamQuoteCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + } + catch (Exception ex) + { + Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}.Exception: {ex}"); + return false; + } + }, _streamQuoteCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - tasks.Add(res); - } + _streamQuotesTasks.Add(streamQuotesTask); + } + do + { + var finished = await Task.WhenAny(_streamQuotesTasks); - do + if (!await finished) { - var finishedTask = tasks.Any(t => !t.Result); + break; + } - if (finishedTask) - { - break; - } + } while (true); - } while (true); - } - catch (Exception ex) - { - Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}.Exception: {ex}"); - } Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Connection lost. Reconnecting in 10 seconds..."); _streamQuoteCancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)); } From 01177a6e2960e25e706b6529bdc4b89a29b76802 Mon Sep 17 00:00:00 2001 From: Romazes Date: Tue, 15 Oct 2024 17:39:16 +0300 Subject: [PATCH 03/26] refactor: use ManualResetEvent instead of AutoResetEvent --- .../TradeStationBrokerage.DataQueueHandler.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs index 1e7c5ea3..c5271d84 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs @@ -78,7 +78,7 @@ public partial class TradeStationBrokerage : IDataQueueHandler /// /// Display that stream quote task was finished great /// - private readonly AutoResetEvent _quoteStreamEndingAutoResetEvent = new(false); + private readonly ManualResetEvent _quoteStreamEndingAutoResetEvent = new(false); /// /// Maintains active stream quote tasks when there are more than 100 subscription symbols. @@ -227,6 +227,7 @@ private void SubscribeOnTickUpdateEvents() } } + _quoteStreamEndingAutoResetEvent.Reset(); while (!_streamQuoteCancellationTokenSource.IsCancellationRequested) { _streamQuotesTasks.Clear(); From de5f3a30a1b474d3615724c9f829e1124265436b Mon Sep 17 00:00:00 2001 From: Romazes Date: Tue, 15 Oct 2024 17:42:38 +0300 Subject: [PATCH 04/26] refactor: handle multiple tasks waiting process feat: handle OperationCanceledException --- .../TradeStationBrokerage.DataQueueHandler.cs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs index c5271d84..54b93248 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs @@ -250,6 +250,11 @@ private void SubscribeOnTickUpdateEvents() } return false; } + catch (OperationCanceledException operationEx) + { + Log.Debug($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}.OperationCanceledException: {operationEx}"); + return false; + } catch (Exception ex) { Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}.Exception: {ex}"); @@ -260,16 +265,8 @@ private void SubscribeOnTickUpdateEvents() _streamQuotesTasks.Add(streamQuotesTask); } - do - { - var finished = await Task.WhenAny(_streamQuotesTasks); - - if (!await finished) - { - break; - } - - } while (true); + // If the operation is canceled using a cancellation token, it ensures all tasks complete before returning false. + await Task.WhenAll(_streamQuotesTasks); Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Connection lost. Reconnecting in 10 seconds..."); _streamQuoteCancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)); From 90b350c7e6b183e6d39e5b3436076885816f67f1 Mon Sep 17 00:00:00 2001 From: Romazes Date: Tue, 15 Oct 2024 19:06:59 +0300 Subject: [PATCH 05/26] rename: ResetEvent variable --- .../TradeStationBrokerage.DataQueueHandler.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs index 54b93248..dacc2580 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs @@ -78,7 +78,7 @@ public partial class TradeStationBrokerage : IDataQueueHandler /// /// Display that stream quote task was finished great /// - private readonly ManualResetEvent _quoteStreamEndingAutoResetEvent = new(false); + private readonly ManualResetEvent _quoteStreamEndingManualResetEvent = new(false); /// /// Maintains active stream quote tasks when there are more than 100 subscription symbols. @@ -227,7 +227,7 @@ private void SubscribeOnTickUpdateEvents() } } - _quoteStreamEndingAutoResetEvent.Reset(); + _quoteStreamEndingManualResetEvent.Reset(); while (!_streamQuoteCancellationTokenSource.IsCancellationRequested) { _streamQuotesTasks.Clear(); @@ -272,7 +272,7 @@ private void SubscribeOnTickUpdateEvents() _streamQuoteCancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)); } // Signal that the quote streaming task is ending - _quoteStreamEndingAutoResetEvent.Set(); + _quoteStreamEndingManualResetEvent.Set(); }, _streamQuoteCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } @@ -425,7 +425,7 @@ private void StopQuoteStreamingTask(bool updateCancellationToken = true) try { _quoteStreamingTask.Wait(); - if (!_quoteStreamEndingAutoResetEvent.WaitOne(TimeSpan.FromSeconds(5))) + if (!_quoteStreamEndingManualResetEvent.WaitOne(TimeSpan.FromSeconds(5))) { Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(StopQuoteStreamingTask)}: TimeOut waiting for Quote Streaming Task to end."); } From 546523e9a786f32df6af41b78a7058e823202e83 Mon Sep 17 00:00:00 2001 From: Romazes Date: Tue, 15 Oct 2024 19:07:55 +0300 Subject: [PATCH 06/26] test:refactor: subscribe on a lot of Equities --- .../TradeStationBrokerageDataQueueHandlerTests.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs b/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs index 58c82175..dde6fb45 100644 --- a/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs +++ b/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs @@ -107,8 +107,8 @@ public void StreamsData(Symbol symbol, Resolution resolution) cancelationToken.Cancel(); } - [TestCase(101)] - public void MultipleSubscription(int subscribeAmount) + [Test] + public void MultipleSubscription() { var lockObject = new object(); var resetEvent = new ManualResetEvent(false); @@ -116,7 +116,7 @@ public void MultipleSubscription(int subscribeAmount) var amountDataBySymbol = new ConcurrentDictionary(); var configBySymbol = new Dictionary Configs, CancellationTokenSource CancellationTokenSource)>(); - foreach (var symbol in _equitySymbols.Value.Take(subscribeAmount)) + foreach (var symbol in _equitySymbols.Value) { foreach (var config in GetSubscriptionDataConfigsBySymbolResolution(symbol, Resolution.Tick)) { @@ -153,7 +153,7 @@ public void MultipleSubscription(int subscribeAmount) resetEvent.WaitOne(TimeSpan.FromSeconds(30), cancelationToken.Token); - foreach (var configs in configBySymbol.Values.Take(subscribeAmount / 2)) + foreach (var configs in configBySymbol.Values.Take(_equitySymbols.Value.Count() / 2)) { foreach (var config in configs.Configs) { From 9189094e18db46387326261b358a314cfeaf4ff1 Mon Sep 17 00:00:00 2001 From: Romazes Date: Wed, 16 Oct 2024 18:39:54 +0300 Subject: [PATCH 07/26] refactor: multiple subscription process feat: StreamTaskManager with stream functionally feat: TradeStation Multiple Subscription Manager to subscribe on multiple symbols remove: handle response in DQH partial class --- .../Streaming/StreamingTaskManager.cs | 221 +++++++++++ .../TradeStationBrokerage.DataQueueHandler.cs | 359 +----------------- .../TradeStationBrokerage.cs | 10 +- ...BrokerageMultiStreamSubscriptionManager.cs | 344 +++++++++++++++++ 4 files changed, 570 insertions(+), 364 deletions(-) create mode 100644 QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs create mode 100644 QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs new file mode 100644 index 00000000..3bc8bc0e --- /dev/null +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -0,0 +1,221 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Linq; +using System.Threading; +using QuantConnect.Logging; +using System.Threading.Tasks; +using System.Collections.Generic; + +namespace QuantConnect.Brokerages.TradeStation.Streaming; + +/// +/// Manages streaming tasks for a collection of items, allowing for subscription, unSubscription and restarting of streaming processes. +/// +public class StreamingTaskManager +{ + /// + /// Indicates whether there are any pending subscription processes. + /// + private bool _hasPendingSubscriptions; + + /// + /// Signals to a that it should be canceled. + /// + private CancellationTokenSource _cancellationTokenSource = new(); + + /// + /// The task representing the ongoing streaming operation. + /// + private Task _streamingTask; + + /// + /// The maximum number of items that can be subscribed to. + /// + private readonly int _maxSubscriptionLimit; + + /// + /// Synchronization object used to ensure thread safety when starting or restarting the streaming task. + /// + private readonly object _streamingTaskLock = new(); + + /// + /// Specifies the delay interval between subscription attempts. + /// + private readonly TimeSpan _subscribeDelay = TimeSpan.FromMilliseconds(1000); + + /// + /// The action to execute for streaming the subscribed items. + /// + private readonly Func, CancellationToken, Task> _streamAction; + + /// + /// Event used to signal the completion of the streaming task. + /// + private readonly AutoResetEvent autoResetEvent = new(false); + + /// + /// Gets the collection of subscribed items. + /// + public readonly HashSet subscriptionBrokerageTickers; + + /// + /// Initializes a new instance of the class. + /// + /// The action to execute for streaming the items. + /// Initial collection of items to subscribe to. + /// The maximum number of items that can be subscribed to. (Defaults to 100) + public StreamingTaskManager( + Func, CancellationToken, Task> streamingAction, + IEnumerable initialSubscribedItems, + int maxSubscriptionLimit = 100) + { + _streamAction = streamingAction ?? throw new ArgumentNullException(nameof(streamingAction), "Streaming action cannot be null."); + subscriptionBrokerageTickers = new(initialSubscribedItems ?? throw new ArgumentNullException(nameof(initialSubscribedItems), "Initial subscribed items cannot be null.")); + _maxSubscriptionLimit = maxSubscriptionLimit; + } + + /// + /// Starts the streaming task and executes the provided streaming action. + /// + public void StartStreaming() + { + lock (_streamingTaskLock) + { + if (_hasPendingSubscriptions) + { + // Avoid duplicate subscriptions by checking if a subscription is already in progress + return; + } + _hasPendingSubscriptions = true; + } + + _streamingTask = Task.Factory.StartNew(async () => + { + // Wait for a specified delay to batch multiple symbol subscriptions into a single request + await Task.Delay(_subscribeDelay).ConfigureAwait(false); + + List brokerageTickers; + lock (_streamingTaskLock) + { + _hasPendingSubscriptions = false; + brokerageTickers = subscriptionBrokerageTickers.ToList(); + if (brokerageTickers.Count == 0) + { + // If there are no symbols to subscribe to, exit the task + Log.Trace($"{nameof(StreamingTaskManager)}.{nameof(StartStreaming)}: No symbols to subscribe to at this time. Exiting subscription task."); + return; + } + } + + try + { + var result = await _streamAction(brokerageTickers, _cancellationTokenSource.Token); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + Log.Error($"{nameof(StreamingTaskManager)}.Exception: {ex}"); + } + finally + { + // Signal: task is completed + autoResetEvent.Set(); + } + }); + } + + /// + /// Stops the currently running streaming task and cancels the current task. + /// + public void StopStreaming() + { + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Stopping the current streaming task."); + + if (_streamingTask != null) + { + _cancellationTokenSource.Cancel(); + + if (!autoResetEvent.WaitOne(TimeSpan.FromSeconds(5))) + { + Log.Error($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Timeout while waiting for the streaming task to complete."); + } + + try + { + _streamingTask.Wait(); + } + catch (Exception ex) + { + Log.Error($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Error during task cancellation: {ex}"); + } + finally + { + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = new CancellationTokenSource(); + } + } + } + + /// + /// Restarts the streaming task by stopping the current one and starting a new one. + /// This is useful for updating subscriptions without needing to manually stop and start. + /// + public void RestartStreaming() + { + StopStreaming(); + StartStreaming(); + } + + /// + /// Adds an item to the subscription list if the maximum limit is not reached. + /// If the item is already present, it will not be added, and the method will return false. + /// + /// The item to add to the subscription list. This should be a unique identifier + /// for the item being subscribed to. + /// true if the item was added successfully; otherwise, false. + public bool AddSubscriptionItem(string item) + { + if (subscriptionBrokerageTickers.Count >= _maxSubscriptionLimit) + { + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Cannot add more items. Maximum limit reached."); + return false; + } + + if (!subscriptionBrokerageTickers.Add(item)) + { + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Item already exists in the list."); + return false; + } + + return true; + } + + /// + /// Removes an item from the subscription list. + /// + /// The item to remove from the subscription list. + /// true if the item was removed successfully; otherwise, false. + public bool RemoveSubscriptionItem(string item) + { + if (subscriptionBrokerageTickers.Remove(item)) + { + return true; + } + + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(RemoveSubscriptionItem)}: Cannot remove item: [{item}]. Item not found."); + return false; + } +} diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs index dacc2580..9f541658 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.DataQueueHandler.cs @@ -14,19 +14,10 @@ */ using System; -using NodaTime; -using System.Linq; -using System.Threading; -using QuantConnect.Util; using QuantConnect.Data; using QuantConnect.Packets; -using QuantConnect.Logging; -using System.Threading.Tasks; using QuantConnect.Interfaces; -using QuantConnect.Data.Market; using System.Collections.Generic; -using System.Collections.Concurrent; -using QuantConnect.Brokerages.TradeStation.Models; namespace QuantConnect.Brokerages.TradeStation; @@ -36,79 +27,15 @@ namespace QuantConnect.Brokerages.TradeStation; public partial class TradeStationBrokerage : IDataQueueHandler { /// - /// The maximum number of symbols allowed per quote stream request. + /// Count number of subscribers for each channel (Symbol, Socket) pair /// - /// - private const int MaxSymbolsPerQuoteStreamRequest = 100; + protected TradeStationBrokerageMultiStreamSubscriptionManager SubscriptionManager { get; set; } /// /// Aggregates ticks and bars based on given subscriptions. /// protected IDataAggregator _aggregator; - /// - /// Count number of subscribers for each channel (Symbol, Socket) pair - /// - protected DataQueueHandlerSubscriptionManager SubscriptionManager { get; set; } - - /// - /// A thread-safe dictionary that stores the order books by brokerage symbols. - /// - private readonly ConcurrentDictionary _orderBooks = new(); - - /// - /// A thread-safe dictionary that maps a to a . - /// - /// - /// This dictionary is used to store the time zone information for each symbol in a concurrent environment, - /// ensuring thread safety when accessing or modifying the time zone data. - /// - private readonly ConcurrentDictionary _exchangeTimeZoneByLeanSymbol = new(); - - /// - /// Use like synchronization context for threads - /// - private readonly object _synchronizationContext = new(); - - /// - /// Synchronization object used to ensure thread safety when starting or restarting the streaming task. - /// - private readonly object _streamingTaskLock = new(); - - /// - /// Display that stream quote task was finished great - /// - private readonly ManualResetEvent _quoteStreamEndingManualResetEvent = new(false); - - /// - /// Maintains active stream quote tasks when there are more than 100 subscription symbols. - /// - private readonly List> _streamQuotesTasks = new(); - - /// - /// Indicates whether there are any pending subscription processes. - /// - private bool _subscriptionsPending; - - /// - /// Specifies the delay interval between subscription attempts. - /// - private readonly TimeSpan _subscribeDelay = TimeSpan.FromMilliseconds(1000); - - /// - /// Represents the currently running task responsible for streaming quotes from the TradeStation API. - /// - private Task _quoteStreamingTask; - - /// - /// Cancellation token source used to signal cancellation requests for the streaming quotes task. - /// - /// - /// This token source is used to cancel the streaming quotes task when it needs to be stopped or restarted. - /// A new instance is created whenever the streaming task is restarted. - /// - private CancellationTokenSource _streamQuoteCancellationTokenSource = new(); - /// /// Sets the job we're subscribing for /// @@ -162,286 +89,4 @@ public void Unsubscribe(SubscriptionDataConfig dataConfig) SubscriptionManager.Unsubscribe(dataConfig); _aggregator.Remove(dataConfig); } - - /// - /// Subscribes to updates for the specified collection of symbols. - /// - /// A collection of symbols to subscribe to. - /// Always, Returns true if the subscription was successful - private bool Subscribe(IEnumerable symbols) - { - foreach (var symbol in symbols) - { - AddOrderBook(symbol); - } - SubscribeOnTickUpdateEvents(); - return true; - } - - /// - /// Unsubscribes from updates for the specified collection of symbols. - /// - /// A collection of symbols to unsubscribe from. - /// Always, Returns true if the unSubscription was successful - private bool UnSubscribe(IEnumerable symbols) - { - foreach (var symbol in symbols) - { - RemoveOrderBook(symbol); - } - SubscribeOnTickUpdateEvents(); - return true; - } - - /// - /// Subscribes to tick update events and handles the streaming of quote updates. - /// - private void SubscribeOnTickUpdateEvents() - { - lock (_streamingTaskLock) - { - if (_subscriptionsPending) - { - // Avoid duplicate subscriptions by checking if a subscription is already in progress - return; - } - _subscriptionsPending = true; - } - StopQuoteStreamingTask(); - - _quoteStreamingTask = Task.Factory.StartNew(async () => - { - // Wait for a specified delay to batch multiple symbol subscriptions into a single request - await Task.Delay(_subscribeDelay).ConfigureAwait(false); - - List brokerageTickers; - lock (_streamingTaskLock) - { - _subscriptionsPending = false; - brokerageTickers = _orderBooks.Keys.ToList(); - if (brokerageTickers.Count == 0) - { - // If there are no symbols to subscribe to, exit the task - Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: No symbols to subscribe to at this time. Exiting subscription task."); - return; - } - } - - _quoteStreamEndingManualResetEvent.Reset(); - while (!_streamQuoteCancellationTokenSource.IsCancellationRequested) - { - _streamQuotesTasks.Clear(); - Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Starting to listen for tick updates..."); - - var brokerageTickerChunks = brokerageTickers.Chunk(MaxSymbolsPerQuoteStreamRequest).ToList(); - for (var i = 0; i < brokerageTickerChunks.Count; i++) - { - var taskIndex = i; - - var streamQuotesTask = await Task.Factory.StartNew(async () => - { - Log.Debug($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Starting task for chunk {i + 1}/{brokerageTickerChunks.Count} with {brokerageTickerChunks[i].Length} tickers."); - try - { - // Stream quotes from the TradeStation API and handle each quote event - await foreach (var quote in _tradeStationApiClient.StreamQuotes(brokerageTickerChunks[taskIndex], _streamQuoteCancellationTokenSource.Token)) - { - HandleQuoteEvents(quote); - } - return false; - } - catch (OperationCanceledException operationEx) - { - Log.Debug($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}.OperationCanceledException: {operationEx}"); - return false; - } - catch (Exception ex) - { - Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}.Exception: {ex}"); - return false; - } - }, _streamQuoteCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - - _streamQuotesTasks.Add(streamQuotesTask); - } - - // If the operation is canceled using a cancellation token, it ensures all tasks complete before returning false. - await Task.WhenAll(_streamQuotesTasks); - - Log.Trace($"{nameof(TradeStationBrokerage)}.{nameof(SubscribeOnTickUpdateEvents)}: Connection lost. Reconnecting in 10 seconds..."); - _streamQuoteCancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)); - } - // Signal that the quote streaming task is ending - _quoteStreamEndingManualResetEvent.Set(); - }, _streamQuoteCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - } - - /// - /// Handles incoming quote events and updates the order books accordingly. - /// - /// The incoming quote containing bid, ask, and trade information. - private void HandleQuoteEvents(Quote quote) - { - if (_orderBooks.TryGetValue(quote.Symbol, out var orderBook)) - { - if (quote.Ask > 0 && quote.AskSize > 0) - { - orderBook.UpdateAskRow(quote.Ask, quote.AskSize); - } - else if (quote.AskSize == 0 && quote.Ask != 0) - { - orderBook.RemoveAskRow(quote.Ask); - } - - if (quote.Bid > 0 && quote.BidSize > 0) - { - orderBook.UpdateBidRow(quote.Bid, quote.BidSize); - } - else if (quote.BidSize == 0 && quote.Bid != 0) - { - orderBook.RemoveBidRow(quote.Bid); - } - - if (quote.Last > 0 && quote.LastSize > 0) - { - EmitTradeTick(orderBook.Symbol, quote.Last, quote.LastSize, quote.TradeTime); - } - } - else - { - Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(HandleQuoteEvents)}: Symbol {quote.Symbol} not found in order books. This could indicate an unexpected symbol or a missing initialization step."); - } - } - - /// - /// Emits a trade tick with the provided details and updates the aggregator. - /// - /// The symbol of the traded instrument. - /// The trade price. - /// The trade size. - /// The time of the trade. - private void EmitTradeTick(Symbol symbol, decimal price, decimal size, DateTime tradeTime) - { - if (!_exchangeTimeZoneByLeanSymbol.TryGetValue(symbol, out var exchangeTimeZone)) - { - return; - } - - var tradeTick = new Tick - { - Value = price, - Time = DateTime.UtcNow.ConvertFromUtc(exchangeTimeZone), - Symbol = symbol, - TickType = TickType.Trade, - Quantity = size - }; - - lock (_synchronizationContext) - { - _aggregator.Update(tradeTick); - } - } - - /// - /// Handles updates to the best bid and ask prices and updates the aggregator with a new quote tick. - /// - /// The source of the event. - /// The event arguments containing best bid and ask details. - private void OnBestBidAskUpdated(object sender, BestBidAskUpdatedEventArgs bestBidAskUpdatedEvent) - { - if (!_exchangeTimeZoneByLeanSymbol.TryGetValue(bestBidAskUpdatedEvent.Symbol, out var exchangeTimeZone)) - { - return; - } - - var tick = new Tick - { - AskPrice = bestBidAskUpdatedEvent.BestAskPrice, - BidPrice = bestBidAskUpdatedEvent.BestBidPrice, - Time = DateTime.UtcNow.ConvertFromUtc(exchangeTimeZone), - Symbol = bestBidAskUpdatedEvent.Symbol, - TickType = TickType.Quote, - AskSize = bestBidAskUpdatedEvent.BestAskSize, - BidSize = bestBidAskUpdatedEvent.BestBidSize - }; - tick.SetValue(); - - lock (_synchronizationContext) - { - _aggregator.Update(tick); - } - } - - /// - /// Adds an order book for the specified symbol if it does not already exist. - /// - /// The symbol for which the order book is to be added. - private void AddOrderBook(Symbol symbol) - { - var exchangeTimeZone = symbol.GetSymbolExchangeTimeZone(); - _exchangeTimeZoneByLeanSymbol[symbol] = exchangeTimeZone; - - var brokerageSymbol = _symbolMapper.GetBrokerageSymbol(symbol); - - if (!_orderBooks.TryGetValue(brokerageSymbol, out var orderBook)) - { - _orderBooks[brokerageSymbol] = new DefaultOrderBook(symbol); - _orderBooks[brokerageSymbol].BestBidAskUpdated += OnBestBidAskUpdated; - } - } - - /// - /// Removes the order book for the specified symbol if it exists. - /// - /// The symbol for which the order book is to be removed. - private void RemoveOrderBook(Symbol symbol) - { - _exchangeTimeZoneByLeanSymbol.Remove(symbol, out _); - - var brokerageSymbol = _symbolMapper.GetBrokerageSymbol(symbol); - - if (_orderBooks.TryRemove(brokerageSymbol, out var orderBook)) - { - orderBook.BestBidAskUpdated -= OnBestBidAskUpdated; - } - } - - /// - /// Stops the currently running streaming task, if any. - /// This method cancels the task, waits for it to complete, - /// handles any exceptions that occur during cancellation, - /// and disposes of the current cancellation token source. - /// - /// - /// This method ensures that any running streaming task is stopped cleanly. - /// A new cancellation token source is created for future tasks after the current one is disposed of. - /// - private void StopQuoteStreamingTask(bool updateCancellationToken = true) - { - Log.Debug($"{nameof(TradeStationBrokerage)}.{nameof(StopQuoteStreamingTask)}._quoteStreamingTask = {_quoteStreamingTask?.Status}"); - if (_quoteStreamingTask != null) - { - _streamQuoteCancellationTokenSource.Cancel(); - try - { - _quoteStreamingTask.Wait(); - if (!_quoteStreamEndingManualResetEvent.WaitOne(TimeSpan.FromSeconds(5))) - { - Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(StopQuoteStreamingTask)}: TimeOut waiting for Quote Streaming Task to end."); - } - } - catch (Exception ex) - { - Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(StopQuoteStreamingTask)}.Exception: {ex}"); - } - finally - { - _streamQuoteCancellationTokenSource.Dispose(); - if (updateCancellationToken) - { - _streamQuoteCancellationTokenSource = new CancellationTokenSource(); - } - } - } - } } diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.cs index ccbff849..879f39dc 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerage.cs @@ -268,12 +268,6 @@ protected void Initialize(string clientId, string clientSecret, string restApiUr _messageHandler = new(HandleTradeStationMessage); - SubscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager() - { - SubscribeImpl = (symbols, _) => Subscribe(symbols), - UnsubscribeImpl = (symbols, _) => UnSubscribe(symbols) - }; - _aggregator = Composer.Instance.GetPart(); if (_aggregator == null) { @@ -283,6 +277,8 @@ protected void Initialize(string clientId, string clientSecret, string restApiUr _aggregator = Composer.Instance.GetExportedValueByTypeName(aggregatorName); } + SubscriptionManager = new TradeStationBrokerageMultiStreamSubscriptionManager(_tradeStationApiClient, _symbolMapper, _aggregator); + _routes = new Lazy>>(() => { return _tradeStationApiClient.GetRoutes().SynchronouslyAwaitTaskResult().Routes @@ -686,7 +682,7 @@ public override void Disconnect() { Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(Disconnect)}: TimeOut waiting for stream order task to end."); } - StopQuoteStreamingTask(updateCancellationToken: false); + SubscriptionManager.Dispose(); } #endregion diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs new file mode 100644 index 00000000..8508c355 --- /dev/null +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -0,0 +1,344 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using NodaTime; +using System.Linq; +using QuantConnect.Data; +using QuantConnect.Logging; +using QuantConnect.Data.Market; +using System.Collections.Generic; +using System.Collections.Concurrent; +using QuantConnect.Brokerages.TradeStation.Api; +using QuantConnect.Brokerages.TradeStation.Models; +using QuantConnect.Brokerages.TradeStation.Streaming; + +namespace QuantConnect.Brokerages.TradeStation +{ + /// + /// Manages multiple streaming subscriptions for the TradeStation brokerage, handling updates received from the TradeStation API. + /// + public class TradeStationBrokerageMultiStreamSubscriptionManager : EventBasedDataQueueHandlerSubscriptionManager, IDisposable + { + /// + /// The maximum number of symbols allowed per quote stream request. + /// + /// + private const int MaxSymbolsPerQuoteStreamRequest = 100; + + /// + /// Manages the list of active quote stream managers. + /// + private List _quoteStreamManagers = new(); + + /// + /// A thread-safe dictionary that stores the order books by brokerage symbols. + /// + private readonly ConcurrentDictionary _orderBooks = new(); + + /// + /// TradeStation api client implementation + /// + private TradeStationApiClient _tradeStationApiClient; + + /// + /// Use like synchronization context for threads + /// + private readonly object _synchronizationContext = new(); + + /// + /// Aggregates ticks and bars based on given subscriptions. + /// + protected IDataAggregator _aggregator; + + /// + /// Provides the mapping between Lean symbols and brokerage specific symbols. + /// + private TradeStationSymbolMapper _symbolMapper; + + /// + /// A thread-safe dictionary that maps a to a . + /// + /// + /// This dictionary is used to store the time zone information for each symbol in a concurrent environment, + /// ensuring thread safety when accessing or modifying the time zone data. + /// + private readonly ConcurrentDictionary _exchangeTimeZoneByLeanSymbol = new(); + + /// + /// + /// + /// + /// + /// + public TradeStationBrokerageMultiStreamSubscriptionManager(TradeStationApiClient tradeStationApiClient, TradeStationSymbolMapper symbolMapper, IDataAggregator aggregator) + { + _aggregator = aggregator; + _symbolMapper = symbolMapper; + _tradeStationApiClient = tradeStationApiClient; + + SubscribeImpl = (symbols, _) => Subscribe(symbols); + UnsubscribeImpl = (symbols, _) => UnSubscribe(symbols); + } + + private bool Subscribe(IEnumerable symbols) + { + var subscribedSymbols = new List(); + foreach (var symbol in symbols) + { + subscribedSymbols.Add(AddOrderBook(symbol)); + } + + foreach (var quoteStream in _quoteStreamManagers) + { + if (subscribedSymbols.Count == 0) + { + break; + } + + var symbolAdded = default(bool); + + foreach (var symbol in subscribedSymbols) + { + if (quoteStream.AddSubscriptionItem(symbol)) + { + subscribedSymbols.Remove(symbol); + symbolAdded = true; + } + + if (subscribedSymbols.Count == 0) + { + break; + } + } + + if (symbolAdded) + { + quoteStream.RestartStreaming(); + } + } + + if (subscribedSymbols.Count > 0) + { + var brokerageSymbolsChunks = subscribedSymbols.Chunk(MaxSymbolsPerQuoteStreamRequest); + + foreach (var brokerageSymbolChunk in brokerageSymbolsChunks) + { + var streamQuoteTask = new StreamingTaskManager(async (brokerageTickers, cancellationToken) => + { + await foreach (var quote in _tradeStationApiClient.StreamQuotes(brokerageTickers, cancellationToken)) + { + HandleQuoteEvents(quote); + } + + return false; + + }, brokerageSymbolChunk, MaxSymbolsPerQuoteStreamRequest); + + _quoteStreamManagers.Add(streamQuoteTask); + + streamQuoteTask.StartStreaming(); + } + } + + return true; + } + + /// + /// Unsubscribes from updates for the specified collection of symbols. + /// + /// A collection of symbols to unsubscribe from. + /// Always, Returns true if the unSubscription was successful + private bool UnSubscribe(IEnumerable symbols) + { + var streamsToRemove = new List(); + + foreach (var symbol in symbols) + { + foreach (var streamQuoteTask in _quoteStreamManagers) + { + if (streamQuoteTask.RemoveSubscriptionItem(RemoveOrderBook(symbol))) + { + streamQuoteTask.RestartStreaming(); + } + + if (streamQuoteTask.subscriptionBrokerageTickers.Count == 0) + { + streamsToRemove.Add(streamQuoteTask); + _quoteStreamManagers.Remove(streamQuoteTask); + } + + if (_quoteStreamManagers.Count == 0) + { + break; + } + } + } + return true; + } + + /// + /// Handles incoming quote events and updates the order books accordingly. + /// + /// The incoming quote containing bid, ask, and trade information. + private void HandleQuoteEvents(Quote quote) + { + if (_orderBooks.TryGetValue(quote.Symbol, out var orderBook)) + { + if (quote.Ask > 0 && quote.AskSize > 0) + { + orderBook.UpdateAskRow(quote.Ask, quote.AskSize); + } + else if (quote.AskSize == 0 && quote.Ask != 0) + { + orderBook.RemoveAskRow(quote.Ask); + } + + if (quote.Bid > 0 && quote.BidSize > 0) + { + orderBook.UpdateBidRow(quote.Bid, quote.BidSize); + } + else if (quote.BidSize == 0 && quote.Bid != 0) + { + orderBook.RemoveBidRow(quote.Bid); + } + + if (quote.Last > 0 && quote.LastSize > 0) + { + EmitTradeTick(orderBook.Symbol, quote.Last, quote.LastSize, quote.TradeTime); + } + } + else + { + Log.Error($"{nameof(TradeStationBrokerage)}.{nameof(HandleQuoteEvents)}: Symbol {quote.Symbol} not found in order books. This could indicate an unexpected symbol or a missing initialization step."); + } + } + + /// + /// Emits a trade tick with the provided details and updates the aggregator. + /// + /// The symbol of the traded instrument. + /// The trade price. + /// The trade size. + /// The time of the trade. + private void EmitTradeTick(Symbol symbol, decimal price, decimal size, DateTime tradeTime) + { + if (!_exchangeTimeZoneByLeanSymbol.TryGetValue(symbol, out var exchangeTimeZone)) + { + return; + } + + var tradeTick = new Tick + { + Value = price, + Time = DateTime.UtcNow.ConvertFromUtc(exchangeTimeZone), + Symbol = symbol, + TickType = TickType.Trade, + Quantity = size + }; + + lock (_synchronizationContext) + { + _aggregator.Update(tradeTick); + } + } + + /// + /// Handles updates to the best bid and ask prices and updates the aggregator with a new quote tick. + /// + /// The source of the event. + /// The event arguments containing best bid and ask details. + private void OnBestBidAskUpdated(object sender, BestBidAskUpdatedEventArgs bestBidAskUpdatedEvent) + { + if (!_exchangeTimeZoneByLeanSymbol.TryGetValue(bestBidAskUpdatedEvent.Symbol, out var exchangeTimeZone)) + { + return; + } + + var tick = new Tick + { + AskPrice = bestBidAskUpdatedEvent.BestAskPrice, + BidPrice = bestBidAskUpdatedEvent.BestBidPrice, + Time = DateTime.UtcNow.ConvertFromUtc(exchangeTimeZone), + Symbol = bestBidAskUpdatedEvent.Symbol, + TickType = TickType.Quote, + AskSize = bestBidAskUpdatedEvent.BestAskSize, + BidSize = bestBidAskUpdatedEvent.BestBidSize + }; + tick.SetValue(); + + lock (_synchronizationContext) + { + _aggregator.Update(tick); + } + } + + /// + /// Adds an order book for the specified symbol if it does not already exist. + /// + /// The symbol for which the order book is to be added. + private string AddOrderBook(Symbol symbol) + { + var exchangeTimeZone = symbol.GetSymbolExchangeTimeZone(); + _exchangeTimeZoneByLeanSymbol[symbol] = exchangeTimeZone; + + var brokerageSymbol = _symbolMapper.GetBrokerageSymbol(symbol); + + if (!_orderBooks.TryGetValue(brokerageSymbol, out var orderBook)) + { + _orderBooks[brokerageSymbol] = new DefaultOrderBook(symbol); + _orderBooks[brokerageSymbol].BestBidAskUpdated += OnBestBidAskUpdated; + } + + return brokerageSymbol; + } + + /// + /// Removes the order book for the specified symbol if it exists. + /// + /// The symbol for which the order book is to be removed. + private string RemoveOrderBook(Symbol symbol) + { + _exchangeTimeZoneByLeanSymbol.Remove(symbol, out _); + + var brokerageSymbol = _symbolMapper.GetBrokerageSymbol(symbol); + + if (_orderBooks.TryRemove(brokerageSymbol, out var orderBook)) + { + orderBook.BestBidAskUpdated -= OnBestBidAskUpdated; + } + + return brokerageSymbol; + } + + public override void Dispose() + { + if (_quoteStreamManagers != null) + { + // Stop each stream in the manager + foreach (var streamQuote in _quoteStreamManagers) + { + streamQuote.StopStreaming(); + } + + // Clear the list to release resources + _quoteStreamManagers.Clear(); + _quoteStreamManagers = null; + } + + + } + } +} \ No newline at end of file From dbf3740ed63ad474c5cc88a6016775d64ca88495 Mon Sep 17 00:00:00 2001 From: Romazes Date: Wed, 16 Oct 2024 19:06:59 +0300 Subject: [PATCH 08/26] fix: OrderType and SecurityType table in Readme --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index a8165c1e..dd401e65 100644 --- a/README.md +++ b/README.md @@ -128,12 +128,12 @@ TradeStation supports cash and margin accounts. The following table describes the available order types for each asset class that our TradeStation integration supports. -| Order Type | Equity | Equity Options | Futures -| ----------- | ----------- | ----------- | -| `MarketOrder` | Yes | Yes | Yes | -| `LimitOrder` | Yes | Yes | Yes | -| `StopMarketOrder` | Yes | Yes | Yes | -| `StopLimitOrder` | Yes | Yes | Yes | +| Order Type / Security Type | Equity | Equity Options | Futures | +|:--------------------------:|:------:|:--------------:|:-------:| +| MarketOrder | Yes | Yes | Yes | +| LimitOrder | Yes | Yes | Yes | +| StopMarketOrder | Yes | Yes | Yes | +| StopLimitOrder | Yes | Yes | Yes | ## Downloading Data From a9a4c310dd9d9fd3cd4aec64c9c791ad71adfd3d Mon Sep 17 00:00:00 2001 From: Romazes Date: Wed, 16 Oct 2024 19:49:57 +0300 Subject: [PATCH 09/26] fix: pending task before start new one in StreamingTaskManager --- .../Streaming/StreamingTaskManager.cs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 3bc8bc0e..8abfb615 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -142,6 +142,15 @@ public void StartStreaming() /// public void StopStreaming() { + lock (_streamingTaskLock) + { + if (_hasPendingSubscriptions) + { + // Avoid duplicate subscriptions by checking if a subscription is already in progress + return; + } + } + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Stopping the current streaming task."); if (_streamingTask != null) From 67c6a6273db398a0fae73cdbe64c77025a55e28a Mon Sep 17 00:00:00 2001 From: Romazes Date: Wed, 16 Oct 2024 19:51:43 +0300 Subject: [PATCH 10/26] refactor: increase some iteration process in TSMultiSubscription --- ...BrokerageMultiStreamSubscriptionManager.cs | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index 8508c355..12607f95 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -117,6 +117,11 @@ private bool Subscribe(IEnumerable symbols) subscribedSymbols.Remove(symbol); symbolAdded = true; } + else + { + // Exit the loop if the subscription limit is reached and no more items can be added. + break; + } if (subscribedSymbols.Count == 0) { @@ -173,19 +178,25 @@ private bool UnSubscribe(IEnumerable symbols) { streamQuoteTask.RestartStreaming(); } - - if (streamQuoteTask.subscriptionBrokerageTickers.Count == 0) + else { - streamsToRemove.Add(streamQuoteTask); - _quoteStreamManagers.Remove(streamQuoteTask); + // Exit the loop if the symbol is not found or cannot be unsubscribed. + break; } - if (_quoteStreamManagers.Count == 0) + if (streamQuoteTask.subscriptionBrokerageTickers.Count == 0) { - break; + streamsToRemove.Add(streamQuoteTask); } } } + + // Remove the streams that have no remaining subscriptions + foreach (var streamToRemove in streamsToRemove) + { + _quoteStreamManagers.Remove(streamToRemove); + } + return true; } From 03e7fbf8da76f92e8c19a7f905748b4591683977 Mon Sep 17 00:00:00 2001 From: Romazes Date: Wed, 16 Oct 2024 19:52:10 +0300 Subject: [PATCH 11/26] test:feat: add description to MultipleSubscription Test --- .../TradeStationBrokerageDataQueueHandlerTests.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs b/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs index dde6fb45..1df188f4 100644 --- a/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs +++ b/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs @@ -116,6 +116,12 @@ public void MultipleSubscription() var amountDataBySymbol = new ConcurrentDictionary(); var configBySymbol = new Dictionary Configs, CancellationTokenSource CancellationTokenSource)>(); + var equitySymbols = _equitySymbols.Value.Count(); + + Log.Trace(""); + Log.Trace($"MULTIPLE SUSBSRIPTION TEST on [{equitySymbols}] symbols."); + Log.Trace(""); + foreach (var symbol in _equitySymbols.Value) { foreach (var config in GetSubscriptionDataConfigsBySymbolResolution(symbol, Resolution.Tick)) @@ -153,7 +159,7 @@ public void MultipleSubscription() resetEvent.WaitOne(TimeSpan.FromSeconds(30), cancelationToken.Token); - foreach (var configs in configBySymbol.Values.Take(_equitySymbols.Value.Count() / 2)) + foreach (var configs in configBySymbol.Values.Take(equitySymbols / 2)) { foreach (var config in configs.Configs) { From 314523cd5741245361e491fa376572a5458998bc Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 14:53:13 +0300 Subject: [PATCH 12/26] feat: implement IDisposable in TradeStationApiClient --- .../Api/TradeStationApiClient.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs b/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs index d49cd93f..4c0c3eee 100644 --- a/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs +++ b/QuantConnect.TradeStationBrokerage/Api/TradeStationApiClient.cs @@ -36,7 +36,7 @@ namespace QuantConnect.Brokerages.TradeStation.Api; /// /// TradeStation api client implementation /// -public class TradeStationApiClient +public class TradeStationApiClient : IDisposable { /// /// Maximum number of bars that can be requested in a single call to . @@ -716,4 +716,12 @@ private async IAsyncEnumerable StreamRequestAsyncEnumerable(string reque } } } + + /// + /// Releases the resources used by the current instance. + /// + public void Dispose() + { + _httpClient.DisposeSafely(); + } } \ No newline at end of file From b0735ff6ccfc87886a07bd08a95e5c5ad4eb1ee9 Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 14:59:57 +0300 Subject: [PATCH 13/26] refactor: RestartStreaming internally in StreamingTaskManager feat: add Dispose to classes --- .../Streaming/StreamingTaskManager.cs | 167 ++++++++++-------- ...BrokerageMultiStreamSubscriptionManager.cs | 114 ++++++------ 2 files changed, 144 insertions(+), 137 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 8abfb615..7a07b140 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -16,6 +16,7 @@ using System; using System.Linq; using System.Threading; +using QuantConnect.Util; using QuantConnect.Logging; using System.Threading.Tasks; using System.Collections.Generic; @@ -25,8 +26,14 @@ namespace QuantConnect.Brokerages.TradeStation.Streaming; /// /// Manages streaming tasks for a collection of items, allowing for subscription, unSubscription and restarting of streaming processes. /// -public class StreamingTaskManager +public class StreamingTaskManager : IDisposable { + /// + /// The maximum number of symbols allowed per quote stream request. + /// + /// + private const int MaxSymbolsPerQuoteStreamRequest = 100; + /// /// Indicates whether there are any pending subscription processes. /// @@ -43,14 +50,14 @@ public class StreamingTaskManager private Task _streamingTask; /// - /// The maximum number of items that can be subscribed to. + /// Synchronization object used to ensure thread safety when starting or restarting the streaming task. /// - private readonly int _maxSubscriptionLimit; + private readonly object _streamingTaskLock = new(); /// - /// Synchronization object used to ensure thread safety when starting or restarting the streaming task. + /// Synchronization object used to ensure thread safety when Add or Remove item in . /// - private readonly object _streamingTaskLock = new(); + private readonly object _brokerageTickerLock = new(); /// /// Specifies the delay interval between subscription attempts. @@ -63,35 +70,91 @@ public class StreamingTaskManager private readonly Func, CancellationToken, Task> _streamAction; /// - /// Event used to signal the completion of the streaming task. + /// Gets the collection of subscribed items. /// - private readonly AutoResetEvent autoResetEvent = new(false); + private readonly HashSet _subscriptionBrokerageTickers = new(); /// - /// Gets the collection of subscribed items. + /// Indicates whether there are no subscribed brokerage tickers. + /// + public bool IsSubscriptionBrokerageTickerEmpty { get => _subscriptionBrokerageTickers.Count == 0; } + + /// + /// Indicates whether the maximum number of subscribed brokerage tickers has been reached. /// - public readonly HashSet subscriptionBrokerageTickers; + public bool IsSubscriptionFilled { get => _subscriptionBrokerageTickers.Count == MaxSymbolsPerQuoteStreamRequest; } /// /// Initializes a new instance of the class. /// /// The action to execute for streaming the items. - /// Initial collection of items to subscribe to. - /// The maximum number of items that can be subscribed to. (Defaults to 100) - public StreamingTaskManager( - Func, CancellationToken, Task> streamingAction, - IEnumerable initialSubscribedItems, - int maxSubscriptionLimit = 100) + public StreamingTaskManager(Func, CancellationToken, Task> streamingAction) { _streamAction = streamingAction ?? throw new ArgumentNullException(nameof(streamingAction), "Streaming action cannot be null."); - subscriptionBrokerageTickers = new(initialSubscribedItems ?? throw new ArgumentNullException(nameof(initialSubscribedItems), "Initial subscribed items cannot be null.")); - _maxSubscriptionLimit = maxSubscriptionLimit; + } + + /// + /// Adds an item to the subscription list if the maximum limit is not reached. + /// If the item is already present, it will not be added, and the method will return false. + /// + /// The item to add to the subscription list. This should be a unique identifier + /// for the item being subscribed to. + /// true if the item was added successfully; otherwise, false. + public bool AddSubscriptionItem(string item) + { + lock (_brokerageTickerLock) + { + if (_subscriptionBrokerageTickers.Count >= MaxSymbolsPerQuoteStreamRequest) + { + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Cannot add more items. Maximum limit reached."); + return false; + } + + if (!_subscriptionBrokerageTickers.Add(item)) + { + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Item already exists in the list."); + return false; + } + } + + RestartStreaming(); + + return true; + } + + /// + /// Removes an item from the subscription list. + /// + /// The item to remove from the subscription list. + /// true if the item was removed successfully; otherwise, false. + public bool RemoveSubscriptionItem(string item) + { + lock (_brokerageTickerLock) + { + if (_subscriptionBrokerageTickers.Remove(item)) + { + RestartStreaming(); + return true; + } + } + Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(RemoveSubscriptionItem)}: Cannot remove item: [{item}]. Item not found."); + return false; + } + + /// + /// Restarts the streaming task by stopping the current one and starting a new one. + /// This is useful for updating subscriptions without needing to manually stop and start. + /// + private void RestartStreaming() + { + StopStreaming(); + StartStreaming(); } /// /// Starts the streaming task and executes the provided streaming action. /// - public void StartStreaming() + private void StartStreaming() { lock (_streamingTaskLock) { @@ -112,7 +175,7 @@ public void StartStreaming() lock (_streamingTaskLock) { _hasPendingSubscriptions = false; - brokerageTickers = subscriptionBrokerageTickers.ToList(); + brokerageTickers = _subscriptionBrokerageTickers.ToList(); if (brokerageTickers.Count == 0) { // If there are no symbols to subscribe to, exit the task @@ -129,18 +192,13 @@ public void StartStreaming() { Log.Error($"{nameof(StreamingTaskManager)}.Exception: {ex}"); } - finally - { - // Signal: task is completed - autoResetEvent.Set(); - } }); } /// /// Stops the currently running streaming task and cancels the current task. /// - public void StopStreaming() + private void StopStreaming() { lock (_streamingTaskLock) { @@ -157,14 +215,12 @@ public void StopStreaming() { _cancellationTokenSource.Cancel(); - if (!autoResetEvent.WaitOne(TimeSpan.FromSeconds(5))) - { - Log.Error($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Timeout while waiting for the streaming task to complete."); - } - try { - _streamingTask.Wait(); + if (!_streamingTask.Wait(TimeSpan.FromSeconds(5))) + { + Log.Error($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Timeout while waiting for the streaming task to complete."); + } } catch (Exception ex) { @@ -179,52 +235,11 @@ public void StopStreaming() } /// - /// Restarts the streaming task by stopping the current one and starting a new one. - /// This is useful for updating subscriptions without needing to manually stop and start. - /// - public void RestartStreaming() - { - StopStreaming(); - StartStreaming(); - } - - /// - /// Adds an item to the subscription list if the maximum limit is not reached. - /// If the item is already present, it will not be added, and the method will return false. - /// - /// The item to add to the subscription list. This should be a unique identifier - /// for the item being subscribed to. - /// true if the item was added successfully; otherwise, false. - public bool AddSubscriptionItem(string item) - { - if (subscriptionBrokerageTickers.Count >= _maxSubscriptionLimit) - { - Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Cannot add more items. Maximum limit reached."); - return false; - } - - if (!subscriptionBrokerageTickers.Add(item)) - { - Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Item already exists in the list."); - return false; - } - - return true; - } - - /// - /// Removes an item from the subscription list. + /// Releases the resources used by the current instance. /// - /// The item to remove from the subscription list. - /// true if the item was removed successfully; otherwise, false. - public bool RemoveSubscriptionItem(string item) + public void Dispose() { - if (subscriptionBrokerageTickers.Remove(item)) - { - return true; - } - - Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(RemoveSubscriptionItem)}: Cannot remove item: [{item}]. Item not found."); - return false; + _streamingTask?.DisposeSafely(); + _cancellationTokenSource?.DisposeSafely(); } } diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index 12607f95..7a8c72a1 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -15,9 +15,11 @@ using System; using NodaTime; -using System.Linq; +using System.Threading; using QuantConnect.Data; +using QuantConnect.Util; using QuantConnect.Logging; +using System.Threading.Tasks; using QuantConnect.Data.Market; using System.Collections.Generic; using System.Collections.Concurrent; @@ -32,12 +34,6 @@ namespace QuantConnect.Brokerages.TradeStation /// public class TradeStationBrokerageMultiStreamSubscriptionManager : EventBasedDataQueueHandlerSubscriptionManager, IDisposable { - /// - /// The maximum number of symbols allowed per quote stream request. - /// - /// - private const int MaxSymbolsPerQuoteStreamRequest = 100; - /// /// Manages the list of active quote stream managers. /// @@ -93,69 +89,56 @@ public TradeStationBrokerageMultiStreamSubscriptionManager(TradeStationApiClient UnsubscribeImpl = (symbols, _) => UnSubscribe(symbols); } + /// + /// Subscribes to updates for the specified collection of symbols. + /// + /// A collection of symbols to subscribe to. + /// Always, Returns true if the subscription was successful private bool Subscribe(IEnumerable symbols) { - var subscribedSymbols = new List(); + var subscribedBrokerageSymbolsQueue = new Queue(); foreach (var symbol in symbols) { - subscribedSymbols.Add(AddOrderBook(symbol)); + subscribedBrokerageSymbolsQueue.Enqueue(AddOrderBook(symbol)); } foreach (var quoteStream in _quoteStreamManagers) { - if (subscribedSymbols.Count == 0) + if (quoteStream.IsSubscriptionFilled) { - break; + // Skip this quote stream as its subscription is full + continue; } - var symbolAdded = default(bool); - - foreach (var symbol in subscribedSymbols) + do { - if (quoteStream.AddSubscriptionItem(symbol)) - { - subscribedSymbols.Remove(symbol); - symbolAdded = true; - } - else - { - // Exit the loop if the subscription limit is reached and no more items can be added. - break; - } + var brokerageSymbol = subscribedBrokerageSymbolsQueue.Dequeue(); - if (subscribedSymbols.Count == 0) + if (!quoteStream.AddSubscriptionItem(brokerageSymbol)) { + // Re-enqueue the symbol since adding it to the subscription failed + subscribedBrokerageSymbolsQueue.Enqueue(brokerageSymbol); + // Exit the loop if the subscription limit is reached and no more items can be added. break; } - } - - if (symbolAdded) - { - quoteStream.RestartStreaming(); - } + } while (subscribedBrokerageSymbolsQueue.Count > 0); } - if (subscribedSymbols.Count > 0) + while (subscribedBrokerageSymbolsQueue.Count > 0) { - var brokerageSymbolsChunks = subscribedSymbols.Chunk(MaxSymbolsPerQuoteStreamRequest); + var streamQuoteTask = new StreamingTaskManager(StreamHandleQuoteEvents); + _quoteStreamManagers.Add(streamQuoteTask); - foreach (var brokerageSymbolChunk in brokerageSymbolsChunks) + do { - var streamQuoteTask = new StreamingTaskManager(async (brokerageTickers, cancellationToken) => - { - await foreach (var quote in _tradeStationApiClient.StreamQuotes(brokerageTickers, cancellationToken)) - { - HandleQuoteEvents(quote); - } - - return false; - - }, brokerageSymbolChunk, MaxSymbolsPerQuoteStreamRequest); + var brokerageSymbol = subscribedBrokerageSymbolsQueue.Dequeue(); - _quoteStreamManagers.Add(streamQuoteTask); - - streamQuoteTask.StartStreaming(); - } + if (!streamQuoteTask.AddSubscriptionItem(brokerageSymbol)) + { + // The subscription limit is reached and no more items can be added. + break; + } + } while (subscribedBrokerageSymbolsQueue.Count > 0); } return true; @@ -174,17 +157,13 @@ private bool UnSubscribe(IEnumerable symbols) { foreach (var streamQuoteTask in _quoteStreamManagers) { - if (streamQuoteTask.RemoveSubscriptionItem(RemoveOrderBook(symbol))) - { - streamQuoteTask.RestartStreaming(); - } - else + if (!streamQuoteTask.RemoveSubscriptionItem(RemoveOrderBook(symbol))) { // Exit the loop if the symbol is not found or cannot be unsubscribed. break; } - if (streamQuoteTask.subscriptionBrokerageTickers.Count == 0) + if (streamQuoteTask.IsSubscriptionBrokerageTickerEmpty) { streamsToRemove.Add(streamQuoteTask); } @@ -200,6 +179,21 @@ private bool UnSubscribe(IEnumerable symbols) return true; } + /// + /// Handles streaming quote events for the specified brokerage tickers. + /// + /// A read-only collection of brokerage tickers to subscribe to for streaming quotes. + /// A cancellation token that can be used to cancel the streaming operation. + /// A task that represents the asynchronous operation, returning false upon completion. + private async Task StreamHandleQuoteEvents(IReadOnlyCollection brokerageTickers, CancellationToken cancellationToken) + { + await foreach (var quote in _tradeStationApiClient.StreamQuotes(brokerageTickers, cancellationToken)) + { + HandleQuoteEvents(quote); + } + return false; + } + /// /// Handles incoming quote events and updates the order books accordingly. /// @@ -334,22 +328,20 @@ private string RemoveOrderBook(Symbol symbol) return brokerageSymbol; } + /// + /// Releases the resources used by the current instance. + /// public override void Dispose() { if (_quoteStreamManagers != null) { - // Stop each stream in the manager - foreach (var streamQuote in _quoteStreamManagers) - { - streamQuote.StopStreaming(); - } - // Clear the list to release resources _quoteStreamManagers.Clear(); _quoteStreamManagers = null; } - + _aggregator.DisposeSafely(); + _tradeStationApiClient.DisposeSafely(); } } } \ No newline at end of file From 9b8ea7666b1602a097b999a8b8be930a350e191a Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 15:29:36 +0300 Subject: [PATCH 14/26] refactor: Unsubscribe process in MultiSubscriptionManager --- ...BrokerageMultiStreamSubscriptionManager.cs | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index 7a8c72a1..12747440 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -153,20 +153,36 @@ private bool UnSubscribe(IEnumerable symbols) { var streamsToRemove = new List(); + var unSubscribeBrokerageSymbolsQueue = new Queue(); foreach (var symbol in symbols) { - foreach (var streamQuoteTask in _quoteStreamManagers) + unSubscribeBrokerageSymbolsQueue.Enqueue(RemoveOrderBook(symbol)); + } + + foreach (var streamQuoteTask in _quoteStreamManagers) + { + do { - if (!streamQuoteTask.RemoveSubscriptionItem(RemoveOrderBook(symbol))) + var brokerageSymbol = unSubscribeBrokerageSymbolsQueue.Dequeue(); + + if (!streamQuoteTask.RemoveSubscriptionItem(brokerageSymbol)) { + // Re-enqueue the symbol since adding it to the subscription failed + unSubscribeBrokerageSymbolsQueue.Enqueue(brokerageSymbol); // Exit the loop if the symbol is not found or cannot be unsubscribed. break; } - if (streamQuoteTask.IsSubscriptionBrokerageTickerEmpty) - { - streamsToRemove.Add(streamQuoteTask); - } + } while (unSubscribeBrokerageSymbolsQueue.Count > 0); + + if (streamQuoteTask.IsSubscriptionBrokerageTickerEmpty) + { + streamsToRemove.Add(streamQuoteTask); + } + + if (unSubscribeBrokerageSymbolsQueue.Count == 0) + { + break; } } From 5d8f57d6a7c85f5920c123d8e061a32ba0b480b7 Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 16:50:30 +0300 Subject: [PATCH 15/26] feat: reconnection of StreamingTaskManager --- .../Streaming/StreamingTaskManager.cs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 7a07b140..f34345c3 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -184,13 +184,20 @@ private void StartStreaming() } } - try - { - var result = await _streamAction(brokerageTickers, _cancellationTokenSource.Token); - } - catch (Exception ex) when (ex is not OperationCanceledException) + while (!_cancellationTokenSource.IsCancellationRequested) { - Log.Error($"{nameof(StreamingTaskManager)}.Exception: {ex}"); + try + { + var result = await _streamAction(brokerageTickers, _cancellationTokenSource.Token); + } + catch (OperationCanceledException oex) + { + // Ski + } + catch (Exception ex) + { + Log.Error($"{nameof(StreamingTaskManager)}.Exception: {ex}"); + } } }); } From 26281cf512f1507726f2fc39ccbd94d2f8a99c0c Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 17:22:03 +0300 Subject: [PATCH 16/26] refactor: disconnect process from StreamTask feat: dev friendly comment why skip OperationCanceledException --- .../Streaming/StreamingTaskManager.cs | 6 ++++-- .../TradeStationBrokerageMultiStreamSubscriptionManager.cs | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index f34345c3..b525c207 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -190,9 +190,10 @@ private void StartStreaming() { var result = await _streamAction(brokerageTickers, _cancellationTokenSource.Token); } - catch (OperationCanceledException oex) + catch (OperationCanceledException) { - // Ski + // Safely skipping: + // Task was cancelled, likely due to token cancellation (e.g., retry attempts or HttpClient.Timeout of 100 seconds). } catch (Exception ex) { @@ -247,6 +248,7 @@ private void StopStreaming() public void Dispose() { _streamingTask?.DisposeSafely(); + _cancellationTokenSource?.Cancel(); _cancellationTokenSource?.DisposeSafely(); } } diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index 12747440..470d4add 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -189,6 +189,7 @@ private bool UnSubscribe(IEnumerable symbols) // Remove the streams that have no remaining subscriptions foreach (var streamToRemove in streamsToRemove) { + streamToRemove.DisposeSafely(); _quoteStreamManagers.Remove(streamToRemove); } From c57e69e9f04879f1355395b6ce0d61921bc63fe1 Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 18:17:14 +0300 Subject: [PATCH 17/26] refactor: reconnection and delay in StreamingTaskManager --- .../Streaming/StreamingTaskManager.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index b525c207..6100cd31 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -184,7 +184,7 @@ private void StartStreaming() } } - while (!_cancellationTokenSource.IsCancellationRequested) + do { try { @@ -199,7 +199,7 @@ private void StartStreaming() { Log.Error($"{nameof(StreamingTaskManager)}.Exception: {ex}"); } - } + } while (!_cancellationTokenSource.IsCancellationRequested && !_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))); }); } From 8a88f5f40caa13aac4c680a62cb816bcaef7af8b Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 18:17:40 +0300 Subject: [PATCH 18/26] test:feat: multiple subscription on different amount of Symbols --- .../TradeStationBrokerageDataQueueHandlerTests.cs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs b/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs index 1df188f4..5f05d870 100644 --- a/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs +++ b/QuantConnect.TradeStationBrokerage.Tests/TradeStationBrokerageDataQueueHandlerTests.cs @@ -107,8 +107,9 @@ public void StreamsData(Symbol symbol, Resolution resolution) cancelationToken.Cancel(); } - [Test] - public void MultipleSubscription() + [TestCase(105, 2)] + [TestCase(300, 100)] + public void MultipleSubscription(int initSubscribeAmount, int unSubscribeAmount) { var lockObject = new object(); var resetEvent = new ManualResetEvent(false); @@ -116,13 +117,13 @@ public void MultipleSubscription() var amountDataBySymbol = new ConcurrentDictionary(); var configBySymbol = new Dictionary Configs, CancellationTokenSource CancellationTokenSource)>(); - var equitySymbols = _equitySymbols.Value.Count(); + var equitySymbols = _equitySymbols.Value.Take(initSubscribeAmount); Log.Trace(""); - Log.Trace($"MULTIPLE SUSBSRIPTION TEST on [{equitySymbols}] symbols."); + Log.Trace($"SUBSCRIBE 105 UNSUBSCRIBE 2 THEN 100 TEST"); Log.Trace(""); - foreach (var symbol in _equitySymbols.Value) + foreach (var symbol in equitySymbols) { foreach (var config in GetSubscriptionDataConfigsBySymbolResolution(symbol, Resolution.Tick)) { @@ -159,7 +160,7 @@ public void MultipleSubscription() resetEvent.WaitOne(TimeSpan.FromSeconds(30), cancelationToken.Token); - foreach (var configs in configBySymbol.Values.Take(equitySymbols / 2)) + foreach (var configs in configBySymbol.Values.TakeLast(unSubscribeAmount)) { foreach (var config in configs.Configs) { From f186153293cac0655c24190c23391b8686dcc4a4 Mon Sep 17 00:00:00 2001 From: Romazes Date: Thu, 17 Oct 2024 23:10:17 +0300 Subject: [PATCH 19/26] refactor: not start task if subscription is empty in RemoveSubscriptionItem refactor: dispose in StreamTaskManager feat: new log about remove subscriptionTask in TradeStationBrokerageMultiStreamSubscriptionManager --- .../Streaming/StreamingTaskManager.cs | 8 ++++++-- ...TradeStationBrokerageMultiStreamSubscriptionManager.cs | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 6100cd31..f6217755 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -133,7 +133,11 @@ public bool RemoveSubscriptionItem(string item) { if (_subscriptionBrokerageTickers.Remove(item)) { - RestartStreaming(); + // Restart streaming if the subscription collection is not empty + if (_subscriptionBrokerageTickers.Count != 0) + { + RestartStreaming(); + } return true; } } @@ -247,8 +251,8 @@ private void StopStreaming() /// public void Dispose() { - _streamingTask?.DisposeSafely(); _cancellationTokenSource?.Cancel(); _cancellationTokenSource?.DisposeSafely(); + _streamingTask?.DisposeSafely(); } } diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index 470d4add..c80225b2 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -191,6 +191,7 @@ private bool UnSubscribe(IEnumerable symbols) { streamToRemove.DisposeSafely(); _quoteStreamManagers.Remove(streamToRemove); + Log.Debug($"{nameof(TradeStationBrokerageMultiStreamSubscriptionManager)}.{nameof(UnSubscribe)}: Stream removed. Remaining active streams: {_quoteStreamManagers.Count}"); } return true; From 239392a2eacdcc9c143fa1299370fdbcf8d717a4 Mon Sep 17 00:00:00 2001 From: Romazes Date: Fri, 18 Oct 2024 19:41:15 +0300 Subject: [PATCH 20/26] remove: extra lock object in StreamingTaskManager feat: add lock to all operation with hash collection in StreamingTaskManager remove: internall list in task --- .../Streaming/StreamingTaskManager.cs | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index f6217755..08239703 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -54,11 +54,6 @@ public class StreamingTaskManager : IDisposable /// private readonly object _streamingTaskLock = new(); - /// - /// Synchronization object used to ensure thread safety when Add or Remove item in . - /// - private readonly object _brokerageTickerLock = new(); - /// /// Specifies the delay interval between subscription attempts. /// @@ -77,12 +72,30 @@ public class StreamingTaskManager : IDisposable /// /// Indicates whether there are no subscribed brokerage tickers. /// - public bool IsSubscriptionBrokerageTickerEmpty { get => _subscriptionBrokerageTickers.Count == 0; } + public bool IsSubscriptionBrokerageTickerEmpty + { + get + { + lock (_streamingTaskLock) + { + return _subscriptionBrokerageTickers.Count == 0; + } + } + } /// /// Indicates whether the maximum number of subscribed brokerage tickers has been reached. /// - public bool IsSubscriptionFilled { get => _subscriptionBrokerageTickers.Count == MaxSymbolsPerQuoteStreamRequest; } + public bool IsSubscriptionFilled + { + get + { + lock(_streamingTaskLock) + { + return _subscriptionBrokerageTickers.Count == MaxSymbolsPerQuoteStreamRequest; + } + } + } /// /// Initializes a new instance of the class. @@ -102,7 +115,7 @@ public StreamingTaskManager(Func, CancellationToken, /// true if the item was added successfully; otherwise, false. public bool AddSubscriptionItem(string item) { - lock (_brokerageTickerLock) + lock (_streamingTaskLock) { if (_subscriptionBrokerageTickers.Count >= MaxSymbolsPerQuoteStreamRequest) { @@ -129,12 +142,15 @@ public bool AddSubscriptionItem(string item) /// true if the item was removed successfully; otherwise, false. public bool RemoveSubscriptionItem(string item) { - lock (_brokerageTickerLock) + lock (_streamingTaskLock) { if (_subscriptionBrokerageTickers.Remove(item)) { - // Restart streaming if the subscription collection is not empty - if (_subscriptionBrokerageTickers.Count != 0) + if (IsSubscriptionBrokerageTickerEmpty) + { + StopStreaming(); + } + else { RestartStreaming(); } @@ -175,12 +191,10 @@ private void StartStreaming() // Wait for a specified delay to batch multiple symbol subscriptions into a single request await Task.Delay(_subscribeDelay).ConfigureAwait(false); - List brokerageTickers; lock (_streamingTaskLock) { _hasPendingSubscriptions = false; - brokerageTickers = _subscriptionBrokerageTickers.ToList(); - if (brokerageTickers.Count == 0) + if (IsSubscriptionBrokerageTickerEmpty) { // If there are no symbols to subscribe to, exit the task Log.Trace($"{nameof(StreamingTaskManager)}.{nameof(StartStreaming)}: No symbols to subscribe to at this time. Exiting subscription task."); @@ -192,7 +206,7 @@ private void StartStreaming() { try { - var result = await _streamAction(brokerageTickers, _cancellationTokenSource.Token); + var result = await _streamAction(_subscriptionBrokerageTickers.ToList(), _cancellationTokenSource.Token); } catch (OperationCanceledException) { From 4af4b17a150b65bef0e6244de2c74a7b77d27a04 Mon Sep 17 00:00:00 2001 From: Romazes Date: Fri, 18 Oct 2024 20:08:15 +0300 Subject: [PATCH 21/26] remove: duplication create method --- ...BrokerageMultiStreamSubscriptionManager.cs | 56 ++++++++++--------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index c80225b2..7be47bf5 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -104,41 +104,18 @@ private bool Subscribe(IEnumerable symbols) foreach (var quoteStream in _quoteStreamManagers) { - if (quoteStream.IsSubscriptionFilled) + // Skip this quote stream as its subscription is full + if (!quoteStream.IsSubscriptionFilled) { - // Skip this quote stream as its subscription is full - continue; + ProcessSubscriptions(quoteStream, subscribedBrokerageSymbolsQueue); } - - do - { - var brokerageSymbol = subscribedBrokerageSymbolsQueue.Dequeue(); - - if (!quoteStream.AddSubscriptionItem(brokerageSymbol)) - { - // Re-enqueue the symbol since adding it to the subscription failed - subscribedBrokerageSymbolsQueue.Enqueue(brokerageSymbol); - // Exit the loop if the subscription limit is reached and no more items can be added. - break; - } - } while (subscribedBrokerageSymbolsQueue.Count > 0); } while (subscribedBrokerageSymbolsQueue.Count > 0) { var streamQuoteTask = new StreamingTaskManager(StreamHandleQuoteEvents); _quoteStreamManagers.Add(streamQuoteTask); - - do - { - var brokerageSymbol = subscribedBrokerageSymbolsQueue.Dequeue(); - - if (!streamQuoteTask.AddSubscriptionItem(brokerageSymbol)) - { - // The subscription limit is reached and no more items can be added. - break; - } - } while (subscribedBrokerageSymbolsQueue.Count > 0); + ProcessSubscriptions(streamQuoteTask, subscribedBrokerageSymbolsQueue); } return true; @@ -346,6 +323,31 @@ private string RemoveOrderBook(Symbol symbol) return brokerageSymbol; } + + /// + /// Processes subscription items from the queue and adds them to the quote stream manager. + /// + /// The quote stream manager responsible for handling the subscription items. + /// + /// A queue of symbols representing the brokerage symbols to be subscribed. + /// Items that cannot be added to the subscription are re-enqueued, and the process stops when the subscription limit is reached. + /// + private void ProcessSubscriptions(StreamingTaskManager quoteStream, Queue symbolsQueue) + { + while (symbolsQueue.Count > 0) + { + var brokerageSymbol = symbolsQueue.Dequeue(); + + if (!quoteStream.AddSubscriptionItem(brokerageSymbol)) + { + // Re-enqueue the symbol since adding it to the subscription failed + symbolsQueue.Enqueue(brokerageSymbol); + // The subscription limit is reached and no more items can be added. + break; + } + } + } + /// /// Releases the resources used by the current instance. /// From 3ae27d53b8536cfc1b957222e6f7e31e7c294c4c Mon Sep 17 00:00:00 2001 From: Romazes Date: Fri, 18 Oct 2024 20:09:38 +0300 Subject: [PATCH 22/26] remove: extra disposing of list --- .../TradeStationBrokerageMultiStreamSubscriptionManager.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index 7be47bf5..c2d9a3ed 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -353,13 +353,6 @@ private void ProcessSubscriptions(StreamingTaskManager quoteStream, Queue public override void Dispose() { - if (_quoteStreamManagers != null) - { - // Clear the list to release resources - _quoteStreamManagers.Clear(); - _quoteStreamManagers = null; - } - _aggregator.DisposeSafely(); _tradeStationApiClient.DisposeSafely(); } From 2fb7dd5360cd1839246a8719b9c2061f5c378c4a Mon Sep 17 00:00:00 2001 From: Romazes Date: Fri, 18 Oct 2024 21:29:24 +0300 Subject: [PATCH 23/26] refactor: RestartStreaming in StreamingTaskManager refactor: RemoveSubscriptionItem in StreamingTaskManager refactor: UnSubscribe in TradeStationBrokerageMultiStreamSubscriptionManager --- .../Streaming/StreamingTaskManager.cs | 141 +++++++----------- ...BrokerageMultiStreamSubscriptionManager.cs | 33 +--- 2 files changed, 59 insertions(+), 115 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 08239703..87de7605 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -78,7 +78,7 @@ public bool IsSubscriptionBrokerageTickerEmpty { lock (_streamingTaskLock) { - return _subscriptionBrokerageTickers.Count == 0; + return _subscriptionBrokerageTickers.Count == 0; } } } @@ -90,9 +90,9 @@ public bool IsSubscriptionFilled { get { - lock(_streamingTaskLock) + lock (_streamingTaskLock) { - return _subscriptionBrokerageTickers.Count == MaxSymbolsPerQuoteStreamRequest; + return _subscriptionBrokerageTickers.Count >= MaxSymbolsPerQuoteStreamRequest; } } } @@ -126,7 +126,7 @@ public bool AddSubscriptionItem(string item) if (!_subscriptionBrokerageTickers.Add(item)) { Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(AddSubscriptionItem)}: Item already exists in the list."); - return false; + return true; } } @@ -146,14 +146,7 @@ public bool RemoveSubscriptionItem(string item) { if (_subscriptionBrokerageTickers.Remove(item)) { - if (IsSubscriptionBrokerageTickerEmpty) - { - StopStreaming(); - } - else - { - RestartStreaming(); - } + RestartStreaming(); return true; } } @@ -161,102 +154,74 @@ public bool RemoveSubscriptionItem(string item) return false; } - /// - /// Restarts the streaming task by stopping the current one and starting a new one. - /// This is useful for updating subscriptions without needing to manually stop and start. - /// - private void RestartStreaming() - { - StopStreaming(); - StartStreaming(); - } - /// /// Starts the streaming task and executes the provided streaming action. /// - private void StartStreaming() + private void RestartStreaming() { - lock (_streamingTaskLock) + try { - if (_hasPendingSubscriptions) - { - // Avoid duplicate subscriptions by checking if a subscription is already in progress - return; - } - _hasPendingSubscriptions = true; - } - - _streamingTask = Task.Factory.StartNew(async () => - { - // Wait for a specified delay to batch multiple symbol subscriptions into a single request - await Task.Delay(_subscribeDelay).ConfigureAwait(false); - lock (_streamingTaskLock) { - _hasPendingSubscriptions = false; - if (IsSubscriptionBrokerageTickerEmpty) + if (_hasPendingSubscriptions) { - // If there are no symbols to subscribe to, exit the task - Log.Trace($"{nameof(StreamingTaskManager)}.{nameof(StartStreaming)}: No symbols to subscribe to at this time. Exiting subscription task."); + // Avoid duplicate subscriptions by checking if a subscription is already in progress return; } + _hasPendingSubscriptions = true; } - do + if (_streamingTask != null) { - try - { - var result = await _streamAction(_subscriptionBrokerageTickers.ToList(), _cancellationTokenSource.Token); - } - catch (OperationCanceledException) - { - // Safely skipping: - // Task was cancelled, likely due to token cancellation (e.g., retry attempts or HttpClient.Timeout of 100 seconds). - } - catch (Exception ex) + _cancellationTokenSource.Cancel(); + if (!_streamingTask.Wait(TimeSpan.FromSeconds(5))) { - Log.Error($"{nameof(StreamingTaskManager)}.Exception: {ex}"); + Log.Error($"{nameof(StreamingTaskManager)}.{nameof(RestartStreaming)}: Timeout while waiting for the streaming task to complete."); } - } while (!_cancellationTokenSource.IsCancellationRequested && !_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))); - }); - } - - /// - /// Stops the currently running streaming task and cancels the current task. - /// - private void StopStreaming() - { - lock (_streamingTaskLock) - { - if (_hasPendingSubscriptions) - { - // Avoid duplicate subscriptions by checking if a subscription is already in progress - return; + _streamingTask = null; + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = new CancellationTokenSource(); } - } - - Log.Debug($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Stopping the current streaming task."); - - if (_streamingTask != null) - { - _cancellationTokenSource.Cancel(); - try + _streamingTask = Task.Factory.StartNew(async () => { - if (!_streamingTask.Wait(TimeSpan.FromSeconds(5))) + // Wait for a specified delay to batch multiple symbol subscriptions into a single request + await Task.Delay(_subscribeDelay).ConfigureAwait(false); + + var brokerageTickers = default(List); + lock (_streamingTaskLock) { - Log.Error($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Timeout while waiting for the streaming task to complete."); + _hasPendingSubscriptions = false; + if (IsSubscriptionBrokerageTickerEmpty) + { + // If there are no symbols to subscribe to, exit the task + Log.Trace($"{nameof(StreamingTaskManager)}.{nameof(RestartStreaming)}: No symbols to subscribe to at this time. Exiting subscription task."); + return; + } + brokerageTickers = _subscriptionBrokerageTickers.ToList(); } - } - catch (Exception ex) - { - Log.Error($"{nameof(StreamingTaskManager)}.{nameof(StopStreaming)}: Error during task cancellation: {ex}"); - } - finally - { - _cancellationTokenSource.Dispose(); - _cancellationTokenSource = new CancellationTokenSource(); - } + + do + { + try + { + var result = await _streamAction(brokerageTickers, _cancellationTokenSource.Token); + } + catch (OperationCanceledException) + { + // Safely skipping: + // Task was cancelled, likely due to token cancellation (e.g., retry attempts or HttpClient.Timeout of 100 seconds). + } + catch (Exception ex) + { + Log.Error($"{nameof(StreamingTaskManager)}.Exception stream action: {ex}"); + } + } while (!_cancellationTokenSource.IsCancellationRequested && !_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))); + }); + } + catch (Exception ex) + { + Log.Error($"{nameof(StreamingTaskManager)}.Exception warpper: {ex}"); } } diff --git a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs index c2d9a3ed..75b249b1 100644 --- a/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs +++ b/QuantConnect.TradeStationBrokerage/TradeStationBrokerageMultiStreamSubscriptionManager.cs @@ -1,4 +1,4 @@ -/* +/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * @@ -15,6 +15,7 @@ using System; using NodaTime; +using System.Linq; using System.Threading; using QuantConnect.Data; using QuantConnect.Util; @@ -130,36 +131,14 @@ private bool UnSubscribe(IEnumerable symbols) { var streamsToRemove = new List(); - var unSubscribeBrokerageSymbolsQueue = new Queue(); - foreach (var symbol in symbols) - { - unSubscribeBrokerageSymbolsQueue.Enqueue(RemoveOrderBook(symbol)); - } - - foreach (var streamQuoteTask in _quoteStreamManagers) + foreach (var brokerageSymbol in symbols.Select(symbol => RemoveOrderBook(symbol))) { - do + foreach (var streamQuoteTask in _quoteStreamManagers.Where(x => x.RemoveSubscriptionItem(brokerageSymbol))) { - var brokerageSymbol = unSubscribeBrokerageSymbolsQueue.Dequeue(); - - if (!streamQuoteTask.RemoveSubscriptionItem(brokerageSymbol)) + if (streamQuoteTask.IsSubscriptionBrokerageTickerEmpty) { - // Re-enqueue the symbol since adding it to the subscription failed - unSubscribeBrokerageSymbolsQueue.Enqueue(brokerageSymbol); - // Exit the loop if the symbol is not found or cannot be unsubscribed. - break; + streamsToRemove.Add(streamQuoteTask); } - - } while (unSubscribeBrokerageSymbolsQueue.Count > 0); - - if (streamQuoteTask.IsSubscriptionBrokerageTickerEmpty) - { - streamsToRemove.Add(streamQuoteTask); - } - - if (unSubscribeBrokerageSymbolsQueue.Count == 0) - { - break; } } From 2102764a8cd1e33188917d0cbde71833c7c41c74 Mon Sep 17 00:00:00 2001 From: Romazes Date: Fri, 18 Oct 2024 22:06:06 +0300 Subject: [PATCH 24/26] feat: missed cancellation token for task in StreamingTaskManager --- .../Streaming/StreamingTaskManager.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 87de7605..341393e3 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -186,7 +186,10 @@ private void RestartStreaming() _streamingTask = Task.Factory.StartNew(async () => { // Wait for a specified delay to batch multiple symbol subscriptions into a single request - await Task.Delay(_subscribeDelay).ConfigureAwait(false); + if (_cancellationTokenSource.Token.WaitHandle.WaitOne(_subscribeDelay)) + { + return; + } var brokerageTickers = default(List); lock (_streamingTaskLock) @@ -217,7 +220,7 @@ private void RestartStreaming() Log.Error($"{nameof(StreamingTaskManager)}.Exception stream action: {ex}"); } } while (!_cancellationTokenSource.IsCancellationRequested && !_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))); - }); + }, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } catch (Exception ex) { @@ -232,6 +235,9 @@ public void Dispose() { _cancellationTokenSource?.Cancel(); _cancellationTokenSource?.DisposeSafely(); - _streamingTask?.DisposeSafely(); + if (_streamingTask != null && _streamingTask.Status == TaskStatus.RanToCompletion) + { + _streamingTask?.DisposeSafely(); + } } } From aece5dc8ac48036301c66dbb4585deddbaa71bc5 Mon Sep 17 00:00:00 2001 From: Romazes Date: Fri, 18 Oct 2024 22:34:42 +0300 Subject: [PATCH 25/26] fix: typo in ex msg --- .../Streaming/StreamingTaskManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 341393e3..24e33a2e 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -224,7 +224,7 @@ private void RestartStreaming() } catch (Exception ex) { - Log.Error($"{nameof(StreamingTaskManager)}.Exception warpper: {ex}"); + Log.Error($"{nameof(StreamingTaskManager)}.Exception wrapper: {ex}"); } } From d68bb6fad22449eb9389e3a8e1f7becdbf56b808 Mon Sep 17 00:00:00 2001 From: Romazes Date: Sat, 19 Oct 2024 00:17:46 +0300 Subject: [PATCH 26/26] refactor: CancellationToken in StreamingTaskManager --- .../Streaming/StreamingTaskManager.cs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs index 24e33a2e..1faa3e10 100644 --- a/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs +++ b/QuantConnect.TradeStationBrokerage/Streaming/StreamingTaskManager.cs @@ -179,14 +179,13 @@ private void RestartStreaming() Log.Error($"{nameof(StreamingTaskManager)}.{nameof(RestartStreaming)}: Timeout while waiting for the streaming task to complete."); } _streamingTask = null; - _cancellationTokenSource.Dispose(); - _cancellationTokenSource = new CancellationTokenSource(); } + var newCancellationTokenSource = _cancellationTokenSource = new CancellationTokenSource(); _streamingTask = Task.Factory.StartNew(async () => { // Wait for a specified delay to batch multiple symbol subscriptions into a single request - if (_cancellationTokenSource.Token.WaitHandle.WaitOne(_subscribeDelay)) + if (newCancellationTokenSource.Token.WaitHandle.WaitOne(_subscribeDelay)) { return; } @@ -208,7 +207,7 @@ private void RestartStreaming() { try { - var result = await _streamAction(brokerageTickers, _cancellationTokenSource.Token); + var result = await _streamAction(brokerageTickers, newCancellationTokenSource.Token); } catch (OperationCanceledException) { @@ -219,8 +218,11 @@ private void RestartStreaming() { Log.Error($"{nameof(StreamingTaskManager)}.Exception stream action: {ex}"); } - } while (!_cancellationTokenSource.IsCancellationRequested && !_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))); - }, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + } while (!newCancellationTokenSource.IsCancellationRequested && !newCancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))); + + + newCancellationTokenSource.DisposeSafely(); + }, newCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } catch (Exception ex) { @@ -233,8 +235,11 @@ private void RestartStreaming() /// public void Dispose() { - _cancellationTokenSource?.Cancel(); - _cancellationTokenSource?.DisposeSafely(); + if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) + { + _cancellationTokenSource.Cancel(); + } + if (_streamingTask != null && _streamingTask.Status == TaskStatus.RanToCompletion) { _streamingTask?.DisposeSafely();