Skip to content

Commit

Permalink
fix: background handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDobrzan committed Jul 20, 2023
1 parent 4bff839 commit b2fdbe3
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 9 deletions.
9 changes: 7 additions & 2 deletions src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ public async Task Dispatch<T>(T invocation) where T : IEffectInvocation {

if (invocation is IEffectCancelInvocation) {
await effectInvocationHandlerMap[invocation.GetType()].Cancel();
} else {
await ((IEffectHandler<T>)effectInvocationHandlerMap[invocation.GetType()]).Run(invocation);
} else
{
var handler = ((IEffectHandler<T>)effectInvocationHandlerMap[invocation.GetType()]);
if (handler.IsBackground(invocation))
handler.Run(invocation).Start();
else
await handler.Run(invocation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal interface IEffectHandler {
/// <typeparam name="T">Associated invocation</typeparam>
internal interface IEffectHandler<in T> : IEffectHandler where T : IEffectInvocation {
Task Run(T invocation);
bool IsBackground(T invocation);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public HandshakeEffectHandler(SubscribeManager2 manager, EventQueue eventQueue)
this.eventQueue = eventQueue;
}

public Task Run(HandshakeReconnectInvocation invocation)
public async Task Run(HandshakeReconnectInvocation invocation)
{
if (!ReconnectionDelayUtil.shouldRetry(invocation.Policy, invocation.AttemptedRetries, invocation.MaxConnectionRetry))
{
Expand All @@ -38,11 +38,15 @@ public Task Run(HandshakeReconnectInvocation invocation)
else
{
retryDelay = new Delay(ReconnectionDelayUtil.CalculateDelay(invocation.Policy, invocation.AttemptedRetries));
// Run in the background
retryDelay.Start().ContinueWith((_) => this.Run((HandshakeInvocation)invocation));
await retryDelay.Start();
if (!retryDelay.Cancelled)
await Run((HandshakeInvocation)invocation);
}
}

return Utils.EmptyTask;
public bool IsBackground(HandshakeReconnectInvocation invocation)
{
return true;
}

public async Task Run(HandshakeInvocation invocation)
Expand All @@ -66,8 +70,13 @@ public async Task Run(HandshakeInvocation invocation)

}
}



public bool IsBackground(HandshakeInvocation invocation)
{
return false;
}


private async Task<System.Tuple<SubscriptionCursor, PNStatus>> MakeHandshakeRequest(HandshakeInvocation invocation)
{
var resp = await manager.HandshakeRequest<string>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public Task Run(ReceiveReconnectInvocation invocation)
return Utils.EmptyTask;
}

public bool IsBackground(ReceiveReconnectInvocation invocation)
{
return true;
}

public async Task Run(ReceiveMessagesInvocation invocation)
{
var response = await MakeReceiveMessagesRequest(invocation);
Expand All @@ -65,10 +70,14 @@ public async Task Run(ReceiveMessagesInvocation invocation)
List<PNMessageResult<object>> listOfMessages = null;
eventQueue.Enqueue(new Events.ReceiveSuccessEvent() { Cursor = response.Item1, Messages= listOfMessages, Status = response.Item2 });
break;

}
}

public bool IsBackground(ReceiveMessagesInvocation invocation)
{
return true;
}

private async Task<System.Tuple<SubscriptionCursor, PNStatus>> MakeReceiveMessagesRequest(ReceiveMessagesInvocation invocation)
{
var resp = await manager.ReceiveRequest<string>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public SubscribeEventEngine(SubscribeManager2 subscribeManager) {
// initialize the handler, pass dependencies
var handshakeHandler = new Effects.HandshakeEffectHandler(subscribeManager, eventQueue);
dispatcher.Register<Invocations.HandshakeInvocation, Effects.HandshakeEffectHandler>(handshakeHandler);
dispatcher.Register<Invocations.HandshakeReconnectInvocation, Effects.HandshakeEffectHandler>(handshakeHandler);
dispatcher.Register<Invocations.CancelHandshakeInvocation, Effects.HandshakeEffectHandler>(handshakeHandler);

currentState = new UnsubscribedState();
Expand Down

0 comments on commit b2fdbe3

Please sign in to comment.