Skip to content

Commit

Permalink
Merge pull request #43 from Lutando/dev
Browse files Browse the repository at this point in the history
Async and Sync Handle Methods for Subscribers and Sagas
  • Loading branch information
Lutando authored Nov 11, 2018
2 parents 81171df + 4740546 commit 5965d9a
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 113 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ Akkatecture is still in development. The goal of this projects first version is

akkatecture is currently missing these crucial features:
- aggregate state snapshotting.
- persisting event metadata.
- typed actor references.
- schedueled jobs.

### Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public MoneyTransferSaga(IActorRef accountAggregateManager)
{
AccountAggregateManager = accountAggregateManager;
}
public Task Handle(IDomainEvent<Account, AccountId, MoneySentEvent> domainEvent)
public bool Handle(IDomainEvent<Account, AccountId, MoneySentEvent> domainEvent)
{
var isNewSpec = new AggregateIsNewSpecification();
if (isNewSpec.IsSatisfiedBy(this))
Expand All @@ -57,19 +57,19 @@ public Task Handle(IDomainEvent<Account, AccountId, MoneySentEvent> domainEvent)

Emit(new MoneyTransferStartedEvent(domainEvent.AggregateEvent.Transaction));
}
return Task.CompletedTask;

return true;
}

public Task Handle(IDomainEvent<Account, AccountId, MoneyReceivedEvent> domainEvent)
public bool Handle(IDomainEvent<Account, AccountId, MoneyReceivedEvent> domainEvent)
{
var spec = new AggregateIsNewSpecification().Not();
if (spec.IsSatisfiedBy(this))
{
Emit(new MoneyTransferCompletedEvent(domainEvent.AggregateEvent.Transaction));
}
return Task.CompletedTask;

return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
namespace Akkatecture.Examples.Api.Domain.Sagas
{
public class ResourceCreationSaga : AggregateSaga<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationSagaState>,
ISagaIsStartedBy<Resource, ResourceId, ResourceCreatedEvent>
ISagaIsStartedByAsync<Resource, ResourceId, ResourceCreatedEvent>
{
public async Task Handle(IDomainEvent<Resource, ResourceId, ResourceCreatedEvent> domainEvent)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Akkatecture.Clustering/Extentions/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ internal static IReadOnlyList<Type> GetSagaEventSubscriptionTypes(this Type type
.ToList();

var handleEventTypes = interfaces
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaHandles<,,>))
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaHandlesAsync<,,>))
.Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0],
t.GetGenericArguments()[1], t.GetGenericArguments()[2]))
.ToList();

var startedByEventTypes = interfaces
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaIsStartedBy<,,>))
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaIsStartedByAsync<,,>))
.Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0],
t.GetGenericArguments()[1], t.GetGenericArguments()[2]))
.ToList();
Expand Down
1 change: 0 additions & 1 deletion src/Akkatecture/Aggregates/AggregateRoot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ public virtual void Emit<TAggregateEvent>(TAggregateEvent aggregateEvent, IMetad
eventMetadata.AddRange(metadata);
}


var committedEvent = new CommittedEvent<TAggregate, TIdentity, TAggregateEvent>(Id,aggregateEvent,eventMetadata);

Persist(committedEvent, ApplyCommittedEvents);
Expand Down
4 changes: 4 additions & 0 deletions src/Akkatecture/Akkatecture.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
<NeutralLanguage>en-GB</NeutralLanguage>
<PackageReleaseNotes>Externally Updated</PackageReleaseNotes>
<PackageIconUrl>https://akkatecture.net/logos/logo-512.png</PackageIconUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.3.9" />
<PackageReference Include="Akka.Persistence" Version="1.3.9" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.1" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta-63127-02" PrivateAssets="All"/>
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Configuration\reference.conf" />
Expand Down
56 changes: 55 additions & 1 deletion src/Akkatecture/Extensions/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ internal static IReadOnlyDictionary<Type, Action<TAggregateState, IAggregateEven
mi => ReflectionHelper.CompileMethodInvocation<Action<TAggregateState, IAggregateEvent>>(type, "Apply", mi.GetParameters()[0].ParameterType));
}

internal static IReadOnlyList<Type> GetDomainEventSubscriberSubscriptionTypes(this Type type)
internal static IReadOnlyList<Type> GetAsyncDomainEventSubscriberSubscriptionTypes(this Type type)
{
//TODO
//Check generic arguments for sanity
Expand All @@ -185,6 +185,30 @@ internal static IReadOnlyList<Type> GetDomainEventSubscriberSubscriptionTypes(th

return domainEventTypes;
}

internal static IReadOnlyList<Type> GetDomainEventSubscriberSubscriptionTypes(this Type type)
{
//TODO
//Check generic arguments for sanity
//add checks for iaggregateroot
//add checks for iidentity
//add checks for iaggregatevent

var interfaces = type
.GetTypeInfo()
.GetInterfaces()
.Select(i => i.GetTypeInfo())
.ToList();
var domainEventTypes = interfaces
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISubscribeTo<,,>))
.Select(i => typeof(IDomainEvent<,,>).MakeGenericType(i.GetGenericArguments()[0],i.GetGenericArguments()[1],i.GetGenericArguments()[2]))
.ToList();


return domainEventTypes;
}


private static readonly ConcurrentDictionary<Type, AggregateName> AggregateNameCache = new ConcurrentDictionary<Type, AggregateName>();
internal static AggregateName GetCommittedEventAggregateRootName(this Type type)
{
Expand All @@ -210,6 +234,36 @@ internal static AggregateName GetCommittedEventAggregateRootName(this Type type)
});
}

internal static IReadOnlyList<Type> GetAsyncSagaEventSubscriptionTypes(this Type type)
{
//TODO
//add checks for iaggregateroot
//add checks for iidentity
//add checks for iaggregatevent

var interfaces = type
.GetTypeInfo()
.GetInterfaces()
.Select(i => i.GetTypeInfo())
.ToList();

var handleEventTypes = interfaces
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaHandlesAsync<,,>))
.Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0],
t.GetGenericArguments()[1], t.GetGenericArguments()[2]))
.ToList();

var startedByEventTypes = interfaces
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISagaIsStartedByAsync<,,>))
.Select(t => typeof(IDomainEvent<,,>).MakeGenericType(t.GetGenericArguments()[0],
t.GetGenericArguments()[1], t.GetGenericArguments()[2]))
.ToList();

startedByEventTypes.AddRange(handleEventTypes);

return startedByEventTypes;
}

internal static IReadOnlyList<Type> GetSagaEventSubscriptionTypes(this Type type)
{
//TODO
Expand Down
146 changes: 101 additions & 45 deletions src/Akkatecture/Sagas/AggregateSaga/AggregateSaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,51 +98,8 @@ protected AggregateSaga()

if (Settings.AutoReceive)
{
var type = GetType();

var subscriptionTypes =
type
.GetSagaEventSubscriptionTypes();

var methods = type
.GetTypeInfo()
.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
.Where(mi =>
{
if (mi.Name != "Handle")
return false;
var parameters = mi.GetParameters();
return
parameters.Length == 1;
})
.ToDictionary(
mi => mi.GetParameters()[0].ParameterType,
mi => mi);


var method = type
.GetBaseType("ReceivePersistentActor")
.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
.Where(mi =>
{
if (mi.Name != "CommandAsync") return false;
var parameters = mi.GetParameters();
return
parameters.Length == 2
&& parameters[0].ParameterType.Name.Contains("Func");
})
.First();

foreach (var subscriptionType in subscriptionTypes)
{
var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(Task));
var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]);
var actorReceiveMethod = method.MakeGenericMethod(subscriptionType);

actorReceiveMethod.Invoke(this, new[] { subscriptionFunction, null });
}
InitReceives();
InitAsyncReceives();
}

Register(State);
Expand All @@ -160,6 +117,105 @@ protected AggregateSaga()

}

public void InitReceives()
{
var type = GetType();

var subscriptionTypes =
type
.GetSagaEventSubscriptionTypes();

var methods = type
.GetTypeInfo()
.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
.Where(mi =>
{
if (mi.Name != "Handle")
return false;
var parameters = mi.GetParameters();
return
parameters.Length == 1;
})
.ToDictionary(
mi => mi.GetParameters()[0].ParameterType,
mi => mi);


var method = type
.GetBaseType("ReceivePersistentActor")
.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
.Where(mi =>
{
if (mi.Name != "Command") return false;
var parameters = mi.GetParameters();
return
parameters.Length == 1
&& parameters[0].ParameterType.Name.Contains("Func");
})
.First();

foreach (var subscriptionType in subscriptionTypes)
{
var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(bool));
var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]);
var actorReceiveMethod = method.MakeGenericMethod(subscriptionType);

actorReceiveMethod.Invoke(this, new[] { subscriptionFunction });
}
}

public void InitAsyncReceives()
{
var type = GetType();

var subscriptionTypes =
type
.GetAsyncSagaEventSubscriptionTypes();

var methods = type
.GetTypeInfo()
.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
.Where(mi =>
{
if (mi.Name != "Handle")
return false;
var parameters = mi.GetParameters();
return
parameters.Length == 1;
})
.ToDictionary(
mi => mi.GetParameters()[0].ParameterType,
mi => mi);


var method = type
.GetBaseType("ReceivePersistentActor")
.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
.Where(mi =>
{
if (mi.Name != "CommandAsync") return false;
var parameters = mi.GetParameters();
return
parameters.Length == 2
&& parameters[0].ParameterType.Name.Contains("Func");
})
.First();

foreach (var subscriptionType in subscriptionTypes)
{
var funcType = typeof(Func<,>).MakeGenericType(subscriptionType, typeof(Task));
var subscriptionFunction = Delegate.CreateDelegate(funcType, this, methods[subscriptionType]);
var actorReceiveMethod = method.MakeGenericMethod(subscriptionType);

actorReceiveMethod.Invoke(this, new[] { subscriptionFunction, null });
}
}


protected virtual void Emit<TAggregateEvent>(TAggregateEvent aggregateEvent, IMetadata metadata = null)
where TAggregateEvent : IAggregateEvent<TAggregateSaga, TIdentity>
{
Expand Down
19 changes: 14 additions & 5 deletions src/Akkatecture/Sagas/AggregateSaga/AggregateSagaManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,38 @@ protected AggregateSagaManager(Expression<Func<TAggregateSaga>> sagaFactory, boo

if (autoSubscribe && Settings.AutoSubscribe)
{
var sagaHandlesSubscriptionTypes =
var sagaEventSubscriptionTypes =
sagaType
.GetSagaEventSubscriptionTypes();

foreach (var type in sagaHandlesSubscriptionTypes)
foreach (var type in sagaEventSubscriptionTypes)
{
Context.System.EventStream.Subscribe(Self, type);
}

var asyncSagaEventSubscriptionTypes =
sagaType
.GetAsyncSagaEventSubscriptionTypes();

foreach (var type in asyncSagaEventSubscriptionTypes)
{
Context.System.EventStream.Subscribe(Self, type);
}
}

if (Settings.AutoSpawnOnReceive)
{
ReceiveAsync<IDomainEvent>(Handle);
Receive<IDomainEvent>(Handle);
}

}

protected virtual Task Handle(IDomainEvent domainEvent)
protected virtual bool Handle(IDomainEvent domainEvent)
{
var sagaId = SagaLocator.LocateSaga(domainEvent);
var saga = FindOrSpawn(sagaId);
saga.Tell(domainEvent,Sender);
return Task.CompletedTask;
return true;
}

protected virtual bool Terminate(Terminated message)
Expand Down
7 changes: 7 additions & 0 deletions src/Akkatecture/Sagas/ISagaHandles.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public interface ISagaHandles<TAggregate, in TIdentity, in TAggregateEvent> : IS
where TAggregateEvent : IAggregateEvent<TAggregate, TIdentity>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
{
bool Handle(IDomainEvent<TAggregate, TIdentity, TAggregateEvent> domainEvent);
}
public interface ISagaHandlesAsync<TAggregate, in TIdentity, in TAggregateEvent> : ISaga
where TAggregateEvent : IAggregateEvent<TAggregate, TIdentity>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
{
Task Handle(IDomainEvent<TAggregate, TIdentity, TAggregateEvent> domainEvent);
}
Expand Down
Loading

0 comments on commit 5965d9a

Please sign in to comment.