Skip to content

Commit

Permalink
Merge pull request #42 from Lutando/dev
Browse files Browse the repository at this point in the history
Fixed ComittedEvent Deserialization
  • Loading branch information
Lutando authored Nov 10, 2018
2 parents e4230f0 + cb997cc commit 81171df
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public static void CreateActorSystem()

public static void Main(string[] args)
{

//initialize actor system
CreateActorSystem();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
namespace Akkatecture.Walkthrough.Domain.Subscribers.Revenue
{
public class RevenueSubscriber : DomainEventSubscriber,
ISubscribeTo<Account,AccountId,FeesDeductedEvent>
ISubscribeToAsync<Account,AccountId,FeesDeductedEvent>
{
public IActorRef RevenueRepository { get; }

Expand All @@ -41,7 +41,7 @@ public RevenueSubscriber(IActorRef revenueRepository)
RevenueRepository = revenueRepository;
}

public Task Handle(IDomainEvent<Account, AccountId, FeesDeductedEvent> domainEvent)
public Task HandleAsync(IDomainEvent<Account, AccountId, FeesDeductedEvent> domainEvent)
{
var command = new AddRevenueCommand(domainEvent.AggregateEvent.Amount);
RevenueRepository.Tell(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
namespace Akkatecture.Examples.Api.Domain.Repositories.Operations
{
public class OperationsStorageHandler : DomainEventSubscriber,
ISubscribeTo<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationStartedEvent>,
ISubscribeTo<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationProgressEvent>,
ISubscribeTo<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationEndedEvent>
ISubscribeToAsync<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationStartedEvent>,
ISubscribeToAsync<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationProgressEvent>,
ISubscribeToAsync<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationEndedEvent>
{
public List<OperationsReadModel> Operations = new List<OperationsReadModel>();

Expand All @@ -43,15 +43,15 @@ public OperationsStorageHandler()
Receive<GetOperationsQuery>(Handle);
}

public Task Handle(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationStartedEvent> domainEvent)
public Task HandleAsync(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationStartedEvent> domainEvent)
{
var operation = new OperationsReadModel(domainEvent.AggregateEvent.ResourceId.GetGuid(),0,0, domainEvent.AggregateEvent.StartedAt);

Operations.Add(operation);
return Task.CompletedTask;
}

public Task Handle(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationProgressEvent> domainEvent)
public Task HandleAsync(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationProgressEvent> domainEvent)
{
var oldOperation = Operations.Single(x => x.Id == domainEvent.AggregateEvent.ResourceId.GetGuid());
var operation = new OperationsReadModel(domainEvent.AggregateEvent.ResourceId.GetGuid(),domainEvent.AggregateEvent.Progress,domainEvent.AggregateEvent.Elapsed, oldOperation.StartedAt);
Expand All @@ -61,7 +61,7 @@ public Task Handle(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, Re
return Task.CompletedTask;
}

public Task Handle(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationEndedEvent> domainEvent)
public Task HandleAsync(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationEndedEvent> domainEvent)
{
var oldOperation = Operations.Single(x => x.Id == domainEvent.AggregateEvent.ResourceId.GetGuid());
var operation = new OperationsReadModel(domainEvent.AggregateEvent.ResourceId.GetGuid(),domainEvent.AggregateEvent.Progress,domainEvent.AggregateEvent.Elapsed, oldOperation.StartedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
namespace Akkatecture.Examples.Api.Domain.Repositories.Resources
{
public class ResourcesStorageHandler : DomainEventSubscriber,
ISubscribeTo<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationEndedEvent>
ISubscribeToAsync<ResourceCreationSaga,ResourceCreationSagaId,ResourceCreationEndedEvent>
{
public List<ResourcesReadModel> Resources = new List<ResourcesReadModel>();

Expand All @@ -40,7 +40,7 @@ public ResourcesStorageHandler()
Receive<GetResourcesQuery>(Handle);
}

public Task Handle(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationEndedEvent> domainEvent)
public Task HandleAsync(IDomainEvent<ResourceCreationSaga, ResourceCreationSagaId, ResourceCreationEndedEvent> domainEvent)
{
var readModel = new ResourcesReadModel(domainEvent.AggregateEvent.ResourceId.GetGuid(),domainEvent.AggregateEvent.Elapsed,domainEvent.AggregateEvent.EndedAt);

Expand Down
18 changes: 4 additions & 14 deletions src/Akkatecture/Aggregates/AggregateRoot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ public bool HasSourceId(ISourceId sourceId)
return !sourceId.IsNone() && _previousSourceIds.Any(s => s.Value == sourceId.Value);
}

public IIdentity GetIdentity()
{
return Id;
}


public virtual void Emit<TAggregateEvent>(TAggregateEvent aggregateEvent, IMetadata metadata = null)
where TAggregateEvent : IAggregateEvent<TAggregate, TIdentity>
{
Expand Down Expand Up @@ -224,11 +227,6 @@ public void ApplyEvents(IReadOnlyCollection<IDomainEvent> domainEvents)
Version = domainEvents.Max(e => e.AggregateSequenceNumber);
}

public IIdentity GetIdentity()
{
return Id;
}

public void ApplyEvents(IEnumerable<IAggregateEvent> aggregateEvents)
{
if (Version > 0)
Expand Down Expand Up @@ -271,14 +269,6 @@ protected Action<IAggregateEvent> GetEventApplyMethods<TAggregateEvent>(TAggrega
return aggregateApplyMethod;
}


protected Action<IAggregateEvent> GetDomainEventApplyMethods<TDomainEvent, TAggregateEvent>(TDomainEvent domainEvent)
where TDomainEvent : IDomainEvent<TAggregate,TIdentity,TAggregateEvent>
where TAggregateEvent : IAggregateEvent<TAggregate, TIdentity>
{
return GetEventApplyMethods(domainEvent.AggregateEvent);
}

protected virtual void ApplyEvent(IAggregateEvent<TAggregate, TIdentity> aggregateEvent)
{
var eventType = aggregateEvent.GetType();
Expand Down
9 changes: 7 additions & 2 deletions src/Akkatecture/Aggregates/CommittedEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using Akkatecture.Core;

namespace Akkatecture.Aggregates
Expand All @@ -36,13 +37,17 @@ public class CommittedEvent<TAggregate, TIdentity, TAggregateEvent> : ICommitted
{
public TAggregateEvent AggregateEvent { get; }
public TIdentity AggregateIdentity { get; }
public IMetadata Metadata { get; }
public Metadata Metadata { get; }

public CommittedEvent(
TIdentity aggregateIdentity,
TAggregateEvent aggregateEvent,
IMetadata metadata)
Metadata metadata)
{
if (aggregateEvent == null) throw new ArgumentNullException(nameof(aggregateEvent));
if (aggregateIdentity == null || string.IsNullOrEmpty(aggregateIdentity.Value)) throw new ArgumentNullException(nameof(aggregateIdentity));


AggregateIdentity = aggregateIdentity;
AggregateEvent = aggregateEvent;
Metadata = metadata;
Expand Down
26 changes: 1 addition & 25 deletions src/Akkatecture/Extensions/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,30 +164,6 @@ internal static IReadOnlyDictionary<Type, Action<TAggregateState, IAggregateEven
mi => ReflectionHelper.CompileMethodInvocation<Action<TAggregateState, IAggregateEvent>>(type, "Apply", mi.GetParameters()[0].ParameterType));
}


internal static IReadOnlyList<Type> GetDomainEventSubscriberSubscriptionTypes<TAggregate, TIdentity>(this Type type)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
{
var aggregateEventType = typeof(IAggregateEvent<TAggregate, TIdentity>);

var interfaces = type
.GetTypeInfo()
.GetInterfaces()
.Select(i => i.GetTypeInfo())
.ToList();
var aggregateEventSubscriptionTypes = interfaces
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISubscribeTo<,,>))
.Where(i => aggregateEventType.GetTypeInfo().IsAssignableFrom(i.GenericTypeArguments[2]))
.Select(i => i.GetGenericArguments()[2])
.ToList();
var domainEventTypes = aggregateEventSubscriptionTypes
.Select(t => typeof(IDomainEvent<,,>).MakeGenericType(typeof(TAggregate), typeof(TIdentity), t))
.ToList();

return domainEventTypes;
}

internal static IReadOnlyList<Type> GetDomainEventSubscriberSubscriptionTypes(this Type type)
{
//TODO
Expand All @@ -202,7 +178,7 @@ internal static IReadOnlyList<Type> GetDomainEventSubscriberSubscriptionTypes(th
.Select(i => i.GetTypeInfo())
.ToList();
var domainEventTypes = interfaces
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISubscribeTo<,,>))
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISubscribeToAsync<,,>))
.Select(i => typeof(IDomainEvent<,,>).MakeGenericType(i.GetGenericArguments()[0],i.GetGenericArguments()[1],i.GetGenericArguments()[2]))
.ToList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ protected AggregateSagaManager(Expression<Func<TAggregateSaga>> sagaFactory, boo
{
Logger = Context.GetLogger();

Context.System.EventStream.Subscribe(Self, typeof(DeadLetter));

SagaLocator = (TSagaLocator)Activator.CreateInstance(typeof(TSagaLocator));

Expand Down
3 changes: 1 addition & 2 deletions src/Akkatecture/Subscribers/DomainEventSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected DomainEventSubscriber()
.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
.Where(mi =>
{
if (mi.Name != "Handle") return false;
if (mi.Name != "HandleAsync") return false;
var parameters = mi.GetParameters();
return
parameters.Length == 1;
Expand Down Expand Up @@ -99,6 +99,5 @@ protected DomainEventSubscriber()
}

}

}
}
10 changes: 9 additions & 1 deletion src/Akkatecture/Subscribers/ISubscribeTo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@

namespace Akkatecture.Subscribers
{
public interface ISubscribeToAsync<TAggregate, in TIdentity, in TEvent>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
where TEvent : IAggregateEvent<TAggregate, TIdentity>
{
Task HandleAsync(IDomainEvent<TAggregate, TIdentity, TEvent> domainEvent);
}

public interface ISubscribeTo<TAggregate, in TIdentity, in TEvent>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
where TEvent : IAggregateEvent<TAggregate, TIdentity>
{
Task Handle(IDomainEvent<TAggregate, TIdentity, TEvent> domainEvent);
bool Handle(IDomainEvent<TAggregate, TIdentity, TEvent> domainEvent);
}
}
14 changes: 14 additions & 0 deletions test/Akkatecture.TestHelpers/SerializationHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Newtonsoft.Json;

namespace Akkatecture.TestHelpers
{
public static class SerializationHelpers
{
public static T SerializeDeserialize<T>(this T message)
{
var json = JsonConvert.SerializeObject(message);
var obj = JsonConvert.DeserializeObject<T>(json);
return obj;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@
namespace Akkatecture.TestHelpers.Subscribers
{
public class TestAggregateSubscriber : DomainEventSubscriber,
ISubscribeTo<TestAggregate,TestAggregateId,TestCreatedEvent>,
ISubscribeTo<TestAggregate, TestAggregateId, TestAddedEvent>
ISubscribeToAsync<TestAggregate,TestAggregateId,TestCreatedEvent>,
ISubscribeToAsync<TestAggregate, TestAggregateId, TestAddedEvent>
{
public Task Handle(IDomainEvent<TestAggregate, TestAggregateId, TestCreatedEvent> domainEvent)
public Task HandleAsync(IDomainEvent<TestAggregate, TestAggregateId, TestCreatedEvent> domainEvent)
{
var handled = new TestSubscribedEventHandled<TestCreatedEvent>(domainEvent.AggregateEvent);
Context.System.EventStream.Publish(handled);
return Task.CompletedTask;
}

public Task Handle(IDomainEvent<TestAggregate, TestAggregateId, TestAddedEvent> domainEvent)
public Task HandleAsync(IDomainEvent<TestAggregate, TestAggregateId, TestAddedEvent> domainEvent)
{
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.ComponentModel;
using Akkatecture.Aggregates;
using Akkatecture.Core;
using Akkatecture.Extensions;
using Akkatecture.TestHelpers;
using Akkatecture.TestHelpers.Aggregates;
using Akkatecture.TestHelpers.Aggregates.Entities;
using Akkatecture.TestHelpers.Aggregates.Events;
using FluentAssertions;
using Newtonsoft.Json;
using Xunit;

namespace Akkatecture.Tests.UnitTests.Serialization
{
public class SerializationTests
{
private const string Category = "Serialization";

[Fact]
[Category(Category)]
public void CommittedEvent_AfterSerialization_IsValidAfterDeserialization()
{

var aggregateId = TestAggregateId.New;
var entityId = TestId.New;
var entity = new Test(entityId);
var aggregateEvent = new TestAddedEvent(entity);
var now = DateTimeOffset.UtcNow;
var eventId = EventId.NewDeterministic(
GuidFactories.Deterministic.Namespaces.Events,
$"{aggregateId.Value}-v{3}");
var eventMetadata = new Metadata
{
Timestamp = now,
AggregateSequenceNumber = 3,
AggregateName = typeof(TestAggregate).GetAggregateName().Value,
AggregateId = aggregateId.Value,
EventId = eventId
};
var committedEvent =
new CommittedEvent<TestAggregate, TestAggregateId, TestAddedEvent>(
aggregateId,
aggregateEvent,
eventMetadata);

committedEvent.SerializeDeserialize().Should().BeEquivalentTo(committedEvent);
}

}

}

0 comments on commit 81171df

Please sign in to comment.