diff --git a/README.md b/README.md index c73cbff7..4941fce0 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,6 @@ Akkatecture is still in development. The goal of this projects first version is akkatecture is currently missing these crucial features: - aggregate state snapshotting. -- persisting event metadata. -- typed actor references. - schedueled jobs. ### Contributing diff --git a/examples/walkthrough/Akkatecture.Walkthrough.Domain/Sagas/MoneyTransfer/MoneyTransferSaga.cs b/examples/walkthrough/Akkatecture.Walkthrough.Domain/Sagas/MoneyTransfer/MoneyTransferSaga.cs index e207d072..0d690c28 100644 --- a/examples/walkthrough/Akkatecture.Walkthrough.Domain/Sagas/MoneyTransfer/MoneyTransferSaga.cs +++ b/examples/walkthrough/Akkatecture.Walkthrough.Domain/Sagas/MoneyTransfer/MoneyTransferSaga.cs @@ -44,7 +44,7 @@ public MoneyTransferSaga(IActorRef accountAggregateManager) { AccountAggregateManager = accountAggregateManager; } - public Task Handle(IDomainEvent domainEvent) + public bool Handle(IDomainEvent domainEvent) { var isNewSpec = new AggregateIsNewSpecification(); if (isNewSpec.IsSatisfiedBy(this)) @@ -57,19 +57,19 @@ public Task Handle(IDomainEvent domainEvent) Emit(new MoneyTransferStartedEvent(domainEvent.AggregateEvent.Transaction)); } - - return Task.CompletedTask; + + return true; } - public Task Handle(IDomainEvent domainEvent) + public bool Handle(IDomainEvent domainEvent) { var spec = new AggregateIsNewSpecification().Not(); if (spec.IsSatisfiedBy(this)) { Emit(new MoneyTransferCompletedEvent(domainEvent.AggregateEvent.Transaction)); } - - return Task.CompletedTask; + + return true; } } } \ No newline at end of file diff --git a/examples/web/Akkatecture.Examples.Api/Domain/Sagas/ResourceCreationSaga.cs b/examples/web/Akkatecture.Examples.Api/Domain/Sagas/ResourceCreationSaga.cs index 09f01579..0a3598d9 100644 --- a/examples/web/Akkatecture.Examples.Api/Domain/Sagas/ResourceCreationSaga.cs +++ b/examples/web/Akkatecture.Examples.Api/Domain/Sagas/ResourceCreationSaga.cs @@ -33,7 +33,7 @@ namespace Akkatecture.Examples.Api.Domain.Sagas { public class ResourceCreationSaga : AggregateSaga, - ISagaIsStartedBy + ISagaIsStartedByAsync { public async Task Handle(IDomainEvent domainEvent) { diff --git a/src/Akkatecture.Clustering/Extentions/TypeExtensions.cs b/src/Akkatecture.Clustering/Extentions/TypeExtensions.cs index 449f9aed..dd40ac48 100644 --- a/src/Akkatecture.Clustering/Extentions/TypeExtensions.cs +++ b/src/Akkatecture.Clustering/Extentions/TypeExtensions.cs @@ -46,13 +46,13 @@ internal static IReadOnlyList GetSagaEventSubscriptionTypes(this Type type .ToList(); var handleEventTypes = interfaces - .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaHandles<,,>)) + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaHandlesAsync<,,>)) .Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0], t.GetGenericArguments()[1], t.GetGenericArguments()[2])) .ToList(); var startedByEventTypes = interfaces - .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaIsStartedBy<,,>)) + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaIsStartedByAsync<,,>)) .Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0], t.GetGenericArguments()[1], t.GetGenericArguments()[2])) .ToList(); diff --git a/src/Akkatecture/Aggregates/AggregateRoot.cs b/src/Akkatecture/Aggregates/AggregateRoot.cs index 91fcf891..ccf51181 100644 --- a/src/Akkatecture/Aggregates/AggregateRoot.cs +++ b/src/Akkatecture/Aggregates/AggregateRoot.cs @@ -143,7 +143,6 @@ public virtual void Emit(TAggregateEvent aggregateEvent, IMetad eventMetadata.AddRange(metadata); } - var committedEvent = new CommittedEvent(Id,aggregateEvent,eventMetadata); Persist(committedEvent, ApplyCommittedEvents); diff --git a/src/Akkatecture/Akkatecture.csproj b/src/Akkatecture/Akkatecture.csproj index 4c867d2e..e1d37812 100644 --- a/src/Akkatecture/Akkatecture.csproj +++ b/src/Akkatecture/Akkatecture.csproj @@ -19,12 +19,16 @@ en-GB Externally Updated https://akkatecture.net/logos/logo-512.png + true + true + $(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb + diff --git a/src/Akkatecture/Extensions/TypeExtensions.cs b/src/Akkatecture/Extensions/TypeExtensions.cs index c4e525d9..7d333d97 100644 --- a/src/Akkatecture/Extensions/TypeExtensions.cs +++ b/src/Akkatecture/Extensions/TypeExtensions.cs @@ -164,7 +164,7 @@ internal static IReadOnlyDictionary ReflectionHelper.CompileMethodInvocation>(type, "Apply", mi.GetParameters()[0].ParameterType)); } - internal static IReadOnlyList GetDomainEventSubscriberSubscriptionTypes(this Type type) + internal static IReadOnlyList GetAsyncDomainEventSubscriberSubscriptionTypes(this Type type) { //TODO //Check generic arguments for sanity @@ -185,6 +185,30 @@ internal static IReadOnlyList GetDomainEventSubscriberSubscriptionTypes(th return domainEventTypes; } + + internal static IReadOnlyList GetDomainEventSubscriberSubscriptionTypes(this Type type) + { + //TODO + //Check generic arguments for sanity + //add checks for iaggregateroot + //add checks for iidentity + //add checks for iaggregatevent + + var interfaces = type + .GetTypeInfo() + .GetInterfaces() + .Select(i => i.GetTypeInfo()) + .ToList(); + var domainEventTypes = interfaces + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISubscribeTo<,,>)) + .Select(i => typeof(IDomainEvent<,,>).MakeGenericType(i.GetGenericArguments()[0],i.GetGenericArguments()[1],i.GetGenericArguments()[2])) + .ToList(); + + + return domainEventTypes; + } + + private static readonly ConcurrentDictionary AggregateNameCache = new ConcurrentDictionary(); internal static AggregateName GetCommittedEventAggregateRootName(this Type type) { @@ -210,6 +234,36 @@ internal static AggregateName GetCommittedEventAggregateRootName(this Type type) }); } + internal static IReadOnlyList GetAsyncSagaEventSubscriptionTypes(this Type type) + { + //TODO + //add checks for iaggregateroot + //add checks for iidentity + //add checks for iaggregatevent + + var interfaces = type + .GetTypeInfo() + .GetInterfaces() + .Select(i => i.GetTypeInfo()) + .ToList(); + + var handleEventTypes = interfaces + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaHandlesAsync<,,>)) + .Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0], + t.GetGenericArguments()[1], t.GetGenericArguments()[2])) + .ToList(); + + var startedByEventTypes = interfaces + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaIsStartedByAsync<,,>)) + .Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0], + t.GetGenericArguments()[1], t.GetGenericArguments()[2])) + .ToList(); + + startedByEventTypes.AddRange(handleEventTypes); + + return startedByEventTypes; + } + internal static IReadOnlyList GetSagaEventSubscriptionTypes(this Type type) { //TODO diff --git a/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs b/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs index 6d1cada2..159abd47 100644 --- a/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs +++ b/src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs @@ -98,51 +98,8 @@ protected AggregateSaga() if (Settings.AutoReceive) { - var type = GetType(); - - var subscriptionTypes = - type - .GetSagaEventSubscriptionTypes(); - - var methods = type - .GetTypeInfo() - .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) - .Where(mi => - { - if (mi.Name != "Handle") - return false; - - var parameters = mi.GetParameters(); - - return - parameters.Length == 1; - }) - .ToDictionary( - mi => mi.GetParameters()[0].ParameterType, - mi => mi); - - - var method = type - .GetBaseType("ReceivePersistentActor") - .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) - .Where(mi => - { - if (mi.Name != "CommandAsync") return false; - var parameters = mi.GetParameters(); - return - parameters.Length == 2 - && parameters[0].ParameterType.Name.Contains("Func"); - }) - .First(); - - foreach (var subscriptionType in subscriptionTypes) - { - var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(Task)); - var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]); - var actorReceiveMethod = method.MakeGenericMethod(subscriptionType); - - actorReceiveMethod.Invoke(this, new[] { subscriptionFunction, null }); - } + InitReceives(); + InitAsyncReceives(); } Register(State); @@ -160,6 +117,105 @@ protected AggregateSaga() } + public void InitReceives() + { + var type = GetType(); + + var subscriptionTypes = + type + .GetSagaEventSubscriptionTypes(); + + var methods = type + .GetTypeInfo() + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => + { + if (mi.Name != "Handle") + return false; + + var parameters = mi.GetParameters(); + + return + parameters.Length == 1; + }) + .ToDictionary( + mi => mi.GetParameters()[0].ParameterType, + mi => mi); + + + var method = type + .GetBaseType("ReceivePersistentActor") + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => + { + if (mi.Name != "Command") return false; + var parameters = mi.GetParameters(); + return + parameters.Length == 1 + && parameters[0].ParameterType.Name.Contains("Func"); + }) + .First(); + + foreach (var subscriptionType in subscriptionTypes) + { + var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(bool)); + var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]); + var actorReceiveMethod = method.MakeGenericMethod(subscriptionType); + + actorReceiveMethod.Invoke(this, new[] { subscriptionFunction }); + } + } + + public void InitAsyncReceives() + { + var type = GetType(); + + var subscriptionTypes = + type + .GetAsyncSagaEventSubscriptionTypes(); + + var methods = type + .GetTypeInfo() + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => + { + if (mi.Name != "Handle") + return false; + + var parameters = mi.GetParameters(); + + return + parameters.Length == 1; + }) + .ToDictionary( + mi => mi.GetParameters()[0].ParameterType, + mi => mi); + + + var method = type + .GetBaseType("ReceivePersistentActor") + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => + { + if (mi.Name != "CommandAsync") return false; + var parameters = mi.GetParameters(); + return + parameters.Length == 2 + && parameters[0].ParameterType.Name.Contains("Func"); + }) + .First(); + + foreach (var subscriptionType in subscriptionTypes) + { + var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(Task)); + var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]); + var actorReceiveMethod = method.MakeGenericMethod(subscriptionType); + + actorReceiveMethod.Invoke(this, new[] { subscriptionFunction, null }); + } + } + + protected virtual void Emit(TAggregateEvent aggregateEvent, IMetadata metadata = null) where TAggregateEvent : IAggregateEvent { diff --git a/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs b/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs index 310380e9..0d63939e 100644 --- a/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs +++ b/src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs @@ -56,11 +56,20 @@ protected AggregateSagaManager(Expression> sagaFactory, boo if (autoSubscribe && Settings.AutoSubscribe) { - var sagaHandlesSubscriptionTypes = + var sagaEventSubscriptionTypes = sagaType .GetSagaEventSubscriptionTypes(); - foreach (var type in sagaHandlesSubscriptionTypes) + foreach (var type in sagaEventSubscriptionTypes) + { + Context.System.EventStream.Subscribe(Self, type); + } + + var asyncSagaEventSubscriptionTypes = + sagaType + .GetAsyncSagaEventSubscriptionTypes(); + + foreach (var type in asyncSagaEventSubscriptionTypes) { Context.System.EventStream.Subscribe(Self, type); } @@ -68,17 +77,17 @@ protected AggregateSagaManager(Expression> sagaFactory, boo if (Settings.AutoSpawnOnReceive) { - ReceiveAsync(Handle); + Receive(Handle); } } - protected virtual Task Handle(IDomainEvent domainEvent) + protected virtual bool Handle(IDomainEvent domainEvent) { var sagaId = SagaLocator.LocateSaga(domainEvent); var saga = FindOrSpawn(sagaId); saga.Tell(domainEvent,Sender); - return Task.CompletedTask; + return true; } protected virtual bool Terminate(Terminated message) diff --git a/src/Akkatecture/Sagas/ISagaHandles.cs b/src/Akkatecture/Sagas/ISagaHandles.cs index fa4f96d0..fa6af19a 100644 --- a/src/Akkatecture/Sagas/ISagaHandles.cs +++ b/src/Akkatecture/Sagas/ISagaHandles.cs @@ -37,6 +37,13 @@ public interface ISagaHandles : IS where TAggregateEvent : IAggregateEvent where TAggregate : IAggregateRoot where TIdentity : IIdentity + { + bool Handle(IDomainEvent domainEvent); + } + public interface ISagaHandlesAsync : ISaga + where TAggregateEvent : IAggregateEvent + where TAggregate : IAggregateRoot + where TIdentity : IIdentity { Task Handle(IDomainEvent domainEvent); } diff --git a/src/Akkatecture/Sagas/ISagaIsStartedBy.cs b/src/Akkatecture/Sagas/ISagaIsStartedBy.cs index 9c3266c4..f49189bf 100644 --- a/src/Akkatecture/Sagas/ISagaIsStartedBy.cs +++ b/src/Akkatecture/Sagas/ISagaIsStartedBy.cs @@ -30,6 +30,13 @@ namespace Akkatecture.Sagas { + public interface ISagaIsStartedByAsync : ISagaHandlesAsync + where TAggregateEvent : IAggregateEvent + where TAggregate : IAggregateRoot + where TIdentity : IIdentity + { + } + public interface ISagaIsStartedBy : ISagaHandles where TAggregateEvent : IAggregateEvent where TAggregate : IAggregateRoot diff --git a/src/Akkatecture/Subscribers/DomainEventSubscriber.cs b/src/Akkatecture/Subscribers/DomainEventSubscriber.cs index e0cd52ef..f3c5a179 100644 --- a/src/Akkatecture/Subscribers/DomainEventSubscriber.cs +++ b/src/Akkatecture/Subscribers/DomainEventSubscriber.cs @@ -34,19 +34,27 @@ public abstract class DomainEventSubscriber : ReceiveActor { public DomainEventSubscriberSettings Settings { get; } - protected DomainEventSubscriber() { Settings = new DomainEventSubscriberSettings(Context.System.Settings.Config); - var type = GetType(); - if (Settings.AutoSubscribe) { + var type = GetType(); + + var asyncSubscriptionTypes = + type + .GetAsyncDomainEventSubscriberSubscriptionTypes(); + var subscriptionTypes = type .GetDomainEventSubscriberSubscriptionTypes(); + foreach (var subscriptionType in asyncSubscriptionTypes) + { + Context.System.EventStream.Subscribe(Self, subscriptionType); + } + foreach (var subscriptionType in subscriptionTypes) { Context.System.EventStream.Subscribe(Self, subscriptionType); @@ -55,49 +63,100 @@ protected DomainEventSubscriber() if (Settings.AutoReceive) { - var subscriptionTypes = - type - .GetDomainEventSubscriberSubscriptionTypes(); + InitReceives(); + InitAsyncReceives(); + } + } - var methods = type - .GetTypeInfo() - .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) - .Where(mi => - { - if (mi.Name != "HandleAsync") return false; - var parameters = mi.GetParameters(); - return - parameters.Length == 1; - }) - .ToDictionary( - mi => mi.GetParameters()[0].ParameterType, - mi => mi); + public void InitReceives() + { + var type = GetType(); + + var subscriptionTypes = + type + .GetDomainEventSubscriberSubscriptionTypes(); + var methods = type + .GetTypeInfo() + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => + { + if (mi.Name != "Handle") return false; + var parameters = mi.GetParameters(); + return + parameters.Length == 1; + }) + .ToDictionary( + mi => mi.GetParameters()[0].ParameterType, + mi => mi); + + var method = type + .GetBaseType("ReceiveActor") + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => + { + if (mi.Name != "Receive") return false; + var parameters = mi.GetParameters(); + return + parameters.Length == 1 + && parameters[0].ParameterType.Name.Contains("Func"); + }) + .First(); + + foreach (var subscriptionType in subscriptionTypes) + { + var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(bool)); + var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]); + var actorReceiveMethod = method.MakeGenericMethod(subscriptionType); - var method = type - .GetBaseType("ReceiveActor") - .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) - .Where(mi => - { - if (mi.Name != "ReceiveAsync") return false; - var parameters = mi.GetParameters(); - return - parameters.Length == 2 - && parameters[0].ParameterType.Name.Contains("Func"); - }) - .First(); + actorReceiveMethod.Invoke(this, new []{subscriptionFunction}); + } + } - foreach (var subscriptionType in subscriptionTypes) + public void InitAsyncReceives() + { + var type = GetType(); + + var subscriptionTypes = + type + .GetAsyncDomainEventSubscriberSubscriptionTypes(); + + var methods = type + .GetTypeInfo() + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => { - var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(Task)); - var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]); - var actorReceiveMethod = method.MakeGenericMethod(subscriptionType); + if (mi.Name != "HandleAsync") return false; + var parameters = mi.GetParameters(); + return + parameters.Length == 1; + }) + .ToDictionary( + mi => mi.GetParameters()[0].ParameterType, + mi => mi); - actorReceiveMethod.Invoke(this, new []{subscriptionFunction,null}); - } + var method = type + .GetBaseType("ReceiveActor") + .GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(mi => + { + if (mi.Name != "ReceiveAsync") return false; + var parameters = mi.GetParameters(); + return + parameters.Length == 2 + && parameters[0].ParameterType.Name.Contains("Func"); + }) + .First(); + + foreach (var subscriptionType in subscriptionTypes) + { + var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(Task)); + var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]); + var actorReceiveMethod = method.MakeGenericMethod(subscriptionType); + + actorReceiveMethod.Invoke(this, new []{subscriptionFunction,null}); } - } } } \ No newline at end of file diff --git a/test/Akkatecture.TestHelpers/Aggregates/Sagas/TestSaga.cs b/test/Akkatecture.TestHelpers/Aggregates/Sagas/TestSaga.cs index c463cd8b..227e21fe 100644 --- a/test/Akkatecture.TestHelpers/Aggregates/Sagas/TestSaga.cs +++ b/test/Akkatecture.TestHelpers/Aggregates/Sagas/TestSaga.cs @@ -45,7 +45,7 @@ public TestSaga(IActorRef testAggregateManager) Command(Handle); } - public Task Handle(IDomainEvent domainEvent) + public bool Handle(IDomainEvent domainEvent) { if (IsNew) { @@ -57,10 +57,11 @@ public Task Handle(IDomainEvent d TestAggregateManager.Tell(command); } - return Task.CompletedTask; + + return true; } - public Task Handle(IDomainEvent domainEvent) + public bool Handle(IDomainEvent domainEvent) { if (!IsNew) { @@ -68,7 +69,7 @@ public Task Handle(IDomainEvent, - ISubscribeToAsync + ISubscribeTo, + ISubscribeTo { - public Task HandleAsync(IDomainEvent domainEvent) + public bool Handle(IDomainEvent domainEvent) { var handled = new TestSubscribedEventHandled(domainEvent.AggregateEvent); Context.System.EventStream.Publish(handled); - return Task.CompletedTask; + return true; } - public Task HandleAsync(IDomainEvent domainEvent) + public bool Handle(IDomainEvent domainEvent) { - return Task.CompletedTask; + var handled = new TestSubscribedEventHandled(domainEvent.AggregateEvent); + Context.System.EventStream.Publish(handled); + + return true; } } + public class TestSubscribedEventHandled { public TAggregateEvent AggregateEvent { get;} diff --git a/test/Akkatecture.TestHelpers/Subscribers/TestAsyncAggregateSubscriber.cs b/test/Akkatecture.TestHelpers/Subscribers/TestAsyncAggregateSubscriber.cs new file mode 100644 index 00000000..d82f4d87 --- /dev/null +++ b/test/Akkatecture.TestHelpers/Subscribers/TestAsyncAggregateSubscriber.cs @@ -0,0 +1,38 @@ +using System.Threading.Tasks; +using Akkatecture.Aggregates; +using Akkatecture.Subscribers; +using Akkatecture.TestHelpers.Aggregates; +using Akkatecture.TestHelpers.Aggregates.Events; + +namespace Akkatecture.TestHelpers.Subscribers +{ + public class TestAsyncAggregateSubscriber: DomainEventSubscriber, + ISubscribeToAsync, + ISubscribeToAsync + { + public Task HandleAsync(IDomainEvent domainEvent) + { + var handled = new TestAsyncSubscribedEventHandled(domainEvent.AggregateEvent); + Context.System.EventStream.Publish(handled); + return Task.CompletedTask; + } + + public Task HandleAsync(IDomainEvent domainEvent) + { + var handled = new TestAsyncSubscribedEventHandled(domainEvent.AggregateEvent); + Context.System.EventStream.Publish(handled); + + return Task.CompletedTask; + } + } + + public class TestAsyncSubscribedEventHandled + { + public TAggregateEvent AggregateEvent { get;} + + public TestAsyncSubscribedEventHandled(TAggregateEvent aggregateEvent) + { + AggregateEvent = aggregateEvent; + } + } +} \ No newline at end of file diff --git a/test/Akkatecture.Tests/IntegrationTests/Aggregates/Sagas/AggregateSagaTests.cs b/test/Akkatecture.Tests/IntegrationTests/Aggregates/Sagas/AggregateSagaTests.cs index 7e1a9e9b..9a0de506 100644 --- a/test/Akkatecture.Tests/IntegrationTests/Aggregates/Sagas/AggregateSagaTests.cs +++ b/test/Akkatecture.Tests/IntegrationTests/Aggregates/Sagas/AggregateSagaTests.cs @@ -77,7 +77,6 @@ public void SendingTest_FromTestAggregate_CompletesSaga() x => x.AggregateEvent.Sender.Equals(senderAggregateId) && x.AggregateEvent.Receiver.Equals(receiverAggregateId) && x.AggregateEvent.SentTest.Equals(senderTest)); - } } } diff --git a/test/Akkatecture.Tests/UnitTests/Subscribers/SubscriberTests.cs b/test/Akkatecture.Tests/UnitTests/Subscribers/SubscriberTests.cs index c0bb1831..1263b8bb 100644 --- a/test/Akkatecture.Tests/UnitTests/Subscribers/SubscriberTests.cs +++ b/test/Akkatecture.Tests/UnitTests/Subscribers/SubscriberTests.cs @@ -26,6 +26,7 @@ using Akka.TestKit.Xunit2; using Akkatecture.TestHelpers.Aggregates; using Akkatecture.TestHelpers.Aggregates.Commands; +using Akkatecture.TestHelpers.Aggregates.Entities; using Akkatecture.TestHelpers.Aggregates.Events; using Akkatecture.TestHelpers.Subscribers; using Xunit; @@ -58,8 +59,24 @@ public void Subscriber_ReceivedEvent_FromAggregatesEmit() ExpectMsg>(x => x.AggregateEvent.TestAggregateId == command.AggregateId); + + } + + [Fact] + [Category(Category)] + public void Subscriber_ReceivedAsyncEvent_FromAggregatesEmit() + { + var probe = CreateTestActor("probeActor"); + Sys.EventStream.Subscribe(probe, typeof(TestAsyncSubscribedEventHandled)); + Sys.ActorOf(Props.Create(() => new TestAsyncAggregateSubscriber()), "test-subscriber"); + var aggregateManager = Sys.ActorOf(Props.Create(() => new TestAggregateManager()), "test-aggregatemanager"); + + var aggregateId = TestAggregateId.New; + var command = new CreateTestCommand(aggregateId); + aggregateManager.Tell(command); - + ExpectMsg>(x => + x.AggregateEvent.TestAggregateId == command.AggregateId); } } } \ No newline at end of file