Skip to content

Commit

Permalink
Further improve perf of DotNet impl
Browse files Browse the repository at this point in the history
* Put send loop back on the main thread since it was eating thread pool resources unnecessarily
  • Loading branch information
mikerochip committed Nov 1, 2024
1 parent fcdbfe1 commit d489b93
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 65 deletions.
61 changes: 29 additions & 32 deletions Runtime/Internal/DotNetWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ public void ProcessIncomingMessages()

public void AddOutgoingMessage(WebSocketMessage message)
{
lock (_outgoingMessages)
{
_outgoingMessages.Enqueue(message);
}
_outgoingMessages.Enqueue(message);
}

public async Task ConnectAsync()
Expand All @@ -156,7 +153,7 @@ public async Task ConnectAsync()
await _socket.ConnectAsync(_uri, _cancellationToken);
Opened?.Invoke();

await RunAsync();
await PumpMessagesAsync();
}
catch (Exception e)
{
Expand Down Expand Up @@ -204,13 +201,20 @@ public void Cancel()
#endregion

#region Internal Methods
private async Task RunAsync()
private async Task PumpMessagesAsync()
{
await Task.WhenAll(ReceiveOnThreadAsync(), SendLoopAsync());
}

private async Task ReceiveOnThreadAsync()
{
// don't block the main thread while pumping messages
await new WaitForBackgroundThreadStart();
// _socket.ReceiveAsync() is a blocking call, and we don't want to block the main
// thread so we put that on a thread pool thread
await new WaitForThreadPoolThreadStart();

try
{
await Task.WhenAll(ReceiveAsync(), SendAsync());
await ReceiveLoopAsync();
}
finally
{
Expand All @@ -219,7 +223,7 @@ private async Task RunAsync()
}
}

private async Task ReceiveAsync()
private async Task ReceiveLoopAsync()
{
var buffer = new ArraySegment<byte>(new byte[_maxReceiveBytes]);
while (_socket.State == System.Net.WebSockets.WebSocketState.Open)
Expand Down Expand Up @@ -277,29 +281,22 @@ private async Task ReceiveAsync()
}
}

private async Task SendAsync()
private async Task SendLoopAsync()
{
while (_socket.State == System.Net.WebSockets.WebSocketState.Open)
{
WebSocketMessage message = null;

lock (_outgoingMessages)
while (_outgoingMessages.Count > 0)
{
if (_outgoingMessages.Count > 0)
message = _outgoingMessages.Dequeue();
}
var message = _outgoingMessages.Dequeue();

if (message == null)
{
await Task.Delay(10, _cancellationToken);
continue;
var segment = new ArraySegment<byte>(message.Bytes);
var type = message.Type == WebSocketDataType.Binary
? WebSocketMessageType.Binary
: WebSocketMessageType.Text;
await _socket.SendAsync(segment, type, endOfMessage: true, _cancellationToken);
}

var segment = new ArraySegment<byte>(message.Bytes);
var type = message.Type == WebSocketDataType.Binary
? WebSocketMessageType.Binary
: WebSocketMessageType.Text;
await _socket.SendAsync(segment, type, endOfMessage: true, _cancellationToken);
await Task.Yield();
}
}
#endregion
Expand Down Expand Up @@ -336,19 +333,19 @@ internal class WaitForMainThreadUpdate
// this completes as soon as we can return to the main thread
public TaskAwaiter<bool> GetAwaiter()
{
var tcs = new TaskCompletionSource<bool>();
MainThreadAsyncAwaitRunner.Run(Wait(tcs));
return tcs.Task.GetAwaiter();
var source = new TaskCompletionSource<bool>();
MainThreadAsyncAwaitRunner.Run(Wait(source));
return source.Task.GetAwaiter();
}

private static IEnumerator Wait(TaskCompletionSource<bool> tcs)
private static IEnumerator Wait(TaskCompletionSource<bool> source)
{
yield return null;
tcs.SetResult(true);
source.SetResult(true);
}
}

internal class WaitForBackgroundThreadStart
internal class WaitForThreadPoolThreadStart
{
// this completes as soon as we can start a ThreadPool thread
public ConfiguredTaskAwaitable.ConfiguredTaskAwaiter GetAwaiter()
Expand Down
54 changes: 22 additions & 32 deletions Runtime/WebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ public static string BytesToString(byte[] bytes) =>
private async void Awake()
{
_cts = new CancellationTokenSource();
await Task.WhenAll(ManageStateAsync(), ConnectAsync(), ReceiveAsync(), SendAsync());
await Task.WhenAll(ManageStateAsync(), ConnectAsync());
}

private void Update()
{
ReceiveIncomingMessages();
SendOutgoingMessages();
}

private void OnDestroy()
Expand Down Expand Up @@ -226,45 +232,29 @@ private async Task ConnectAsync()
}
}

private async Task ReceiveAsync()
private void ReceiveIncomingMessages()
{
while (true)
{
if (_cts.IsCancellationRequested)
break;

if (_webSocket?.State == Internal.WebSocketState.Open)
_webSocket.ProcessIncomingMessages();

await Task.Yield();
}
if (_webSocket?.State == Internal.WebSocketState.Open)
_webSocket.ProcessIncomingMessages();
}

private async Task SendAsync()
private void SendOutgoingMessages()
{
while (true)
if (_webSocket?.State == Internal.WebSocketState.Open && IsPinging)
{
if (_cts.IsCancellationRequested)
break;

if (_webSocket?.State == Internal.WebSocketState.Open && IsPinging)
var now = DateTime.Now;
var timeSinceLastPing = now - _lastPingTimestamp;
if (timeSinceLastPing >= Config.PingInterval)
{
var now = DateTime.Now;
var timeSinceLastPing = now - _lastPingTimestamp;
if (timeSinceLastPing >= Config.PingInterval)
{
_webSocket.AddOutgoingMessage(Config.PingMessage);
_lastPingTimestamp = now;
}
}

while (_webSocket?.State == Internal.WebSocketState.Open && _outgoingMessages.Count > 0)
{
var message = _outgoingMessages.Dequeue();
_webSocket.AddOutgoingMessage(message);
_webSocket.AddOutgoingMessage(Config.PingMessage);
_lastPingTimestamp = now;
}
}

await Task.Yield();
while (_webSocket?.State == Internal.WebSocketState.Open && _outgoingMessages.Count > 0)
{
var message = _outgoingMessages.Dequeue();
_webSocket.AddOutgoingMessage(message);
}
}
#endregion
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "com.mikeschweitzer.websocket",
"version": "2.0.1",
"version": "2.0.2",
"displayName": "WebSocket Client",
"description": "Simple, flexible WebSocket client. Add the WebSocketConnection MonoBehaviour anywhere in your scene.",
"license": "Apache 2.0",
Expand Down

0 comments on commit d489b93

Please sign in to comment.