From 86ec94c59ca3b0ae9f50884748e3451b2fc384a4 Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Fri, 8 Jan 2021 14:21:22 +0100 Subject: [PATCH 1/2] Let sagas execute commands directly --- eventhandler/saga/eventhandler.go | 46 +++++++++++++++----- eventhandler/saga/eventhandler_test.go | 7 ++- examples/guestlist/domains/guestlist/saga.go | 10 ++--- 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/eventhandler/saga/eventhandler.go b/eventhandler/saga/eventhandler.go index bd43b1af..e129ecee 100644 --- a/eventhandler/saga/eventhandler.go +++ b/eventhandler/saga/eventhandler.go @@ -16,7 +16,7 @@ package saga import ( "context" - "errors" + "fmt" eh "github.com/looplab/eventhorizon" ) @@ -37,7 +37,8 @@ type Saga interface { SagaType() Type // RunSaga handles an event in the saga that can return commands. - RunSaga(context.Context, eh.Event) []eh.Command + // If an error is returned from the saga, the event will be run again. + RunSaga(context.Context, eh.Event, eh.CommandHandler) error } // Type is the type of a saga, used as its unique identifier. @@ -48,6 +49,32 @@ func (t Type) String() string { return string(t) } +// Error is an error in the projector, with the namespace. +type Error struct { + // Err is the error that happened when projecting the event. + Err error + // Saga is the saga where the error happened. + Saga string + // Namespace is the namespace for the error. + Namespace string +} + +// Error implements the Error method of the errors.Error interface. +func (e Error) Error() string { + return fmt.Sprintf("%s: %s (%s)", + e.Saga, e.Err, e.Namespace) +} + +// Unwrap implements the errors.Unwrap method. +func (e Error) Unwrap() error { + return e.Err +} + +// Cause implements the github.com/pkg/errors Unwrap method. +func (e Error) Cause() error { + return e.Unwrap() +} + // NewEventHandler creates a new EventHandler. func NewEventHandler(saga Saga, commandHandler eh.CommandHandler) *EventHandler { return &EventHandler{ @@ -63,15 +90,12 @@ func (h *EventHandler) HandlerType() eh.EventHandlerType { // HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface. func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { - // Run the saga and collect commands. - cmds := h.saga.RunSaga(ctx, event) - - // Dispatch commands back on the command bus. - for _, cmd := range cmds { - if err := h.commandHandler.HandleCommand(ctx, cmd); err != nil { - return errors.New("could not handle command '" + - cmd.CommandType().String() + "' from saga '" + - h.saga.SagaType().String() + "': " + err.Error()) + // Run the saga which can issue commands on the provided command handler. + if err := h.saga.RunSaga(ctx, event, h.commandHandler); err != nil { + return Error{ + Err: err, + Saga: h.saga.SagaType().String(), + Namespace: eh.NamespaceFromContext(ctx), } } diff --git a/eventhandler/saga/eventhandler_test.go b/eventhandler/saga/eventhandler_test.go index 7d3126b9..f4deff4a 100644 --- a/eventhandler/saga/eventhandler_test.go +++ b/eventhandler/saga/eventhandler_test.go @@ -63,8 +63,11 @@ func (m *TestSaga) SagaType() Type { return TestSagaType } -func (m *TestSaga) RunSaga(ctx context.Context, event eh.Event) []eh.Command { +func (m *TestSaga) RunSaga(ctx context.Context, event eh.Event, h eh.CommandHandler) error { m.event = event m.context = ctx - return m.commands + for _, cmd := range m.commands { + return h.HandleCommand(ctx, cmd) + } + return nil } diff --git a/examples/guestlist/domains/guestlist/saga.go b/examples/guestlist/domains/guestlist/saga.go index 1f1a15d3..fd5dda0d 100644 --- a/examples/guestlist/domains/guestlist/saga.go +++ b/examples/guestlist/domains/guestlist/saga.go @@ -48,7 +48,7 @@ func (s *ResponseSaga) SagaType() saga.Type { } // RunSaga implements the Run saga method of the Saga interface. -func (s *ResponseSaga) RunSaga(ctx context.Context, event eh.Event) []eh.Command { +func (s *ResponseSaga) RunSaga(ctx context.Context, event eh.Event, h eh.CommandHandler) error { switch event.EventType() { case InviteAcceptedEvent: // Do nothing for already accepted guests. @@ -61,9 +61,9 @@ func (s *ResponseSaga) RunSaga(ctx context.Context, event eh.Event) []eh.Command // Deny the invite if the guest list is full. if len(s.acceptedGuests) >= s.guestLimit { - return []eh.Command{ + return h.HandleCommand(ctx, &DenyInvite{ID: event.AggregateID()}, - } + ) } // Confirm the invite when there is space left. @@ -71,9 +71,9 @@ func (s *ResponseSaga) RunSaga(ctx context.Context, event eh.Event) []eh.Command s.acceptedGuests[event.AggregateID()] = true s.acceptedGuestsMu.Unlock() - return []eh.Command{ + return h.HandleCommand(ctx, &ConfirmInvite{ID: event.AggregateID()}, - } + ) } return nil From efedfc116d657785ffc52d9cd651bde67bd7de9d Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Fri, 8 Jan 2021 14:22:07 +0100 Subject: [PATCH 2/2] Provide of projector in errors --- eventhandler/projector/eventhandler.go | 48 ++++++++++++--------- eventhandler/projector/eventhandler_test.go | 20 ++++++--- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/eventhandler/projector/eventhandler.go b/eventhandler/projector/eventhandler.go index 96f5e0ee..9a86e89e 100644 --- a/eventhandler/projector/eventhandler.go +++ b/eventhandler/projector/eventhandler.go @@ -35,39 +35,40 @@ var _ = eh.EventHandler(&EventHandler{}) // Projector is a projector of events onto models. type Projector interface { + // ProjectorType returns the type of the projector. + ProjectorType() Type + // Project projects an event onto a model and returns the updated model or // an error. Project(context.Context, eh.Event, eh.Entity) (eh.Entity, error) - - // ProjectorType returns the type of the projector. - ProjectorType() Type } // Type is the type of a projector, used as its unique identifier. type Type string +// String returns the string representation of a projector type. +func (t Type) String() string { + return string(t) +} + // Error is an error in the projector, with the namespace. type Error struct { // Err is the error that happened when projecting the event. Err error + // Projector is the projector where the error happened. + Projector string + // Namespace is the namespace for the error. + Namespace string // EventVersion is the version of the event. EventVersion int // EntityVersion is the version of the entity. EntityVersion int - // BaseErr is an optional underlying error, for example from the DB driver. - BaseErr error - // Namespace is the namespace for the error. - Namespace string } // Error implements the Error method of the errors.Error interface. func (e Error) Error() string { - errStr := e.Err.Error() - if e.BaseErr != nil { - errStr += ": " + e.BaseErr.Error() - } - return fmt.Sprintf("projector: %s, event: v%d, entity: v%d (%s)", - errStr, e.EventVersion, e.EntityVersion, e.Namespace) + return fmt.Sprintf("%s: %s, event: v%d, entity: v%d (%s)", + e.Projector, e.Err, e.EventVersion, e.EntityVersion, e.Namespace) } // Unwrap implements the errors.Unwrap method. @@ -127,16 +128,18 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { if h.factoryFn == nil { return Error{ Err: ErrModelNotSet, - EventVersion: event.Version(), + Projector: h.projector.ProjectorType().String(), Namespace: eh.NamespaceFromContext(ctx), + EventVersion: event.Version(), } } entity = h.factoryFn() } else if err != nil { return Error{ Err: err, - EventVersion: event.Version(), + Projector: h.projector.ProjectorType().String(), Namespace: eh.NamespaceFromContext(ctx), + EventVersion: event.Version(), } } @@ -153,9 +156,10 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { if entity.AggregateVersion()+1 != event.Version() { return Error{ Err: eh.ErrIncorrectEntityVersion, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), EventVersion: event.Version(), EntityVersion: entityVersion, - Namespace: eh.NamespaceFromContext(ctx), } } } @@ -165,9 +169,10 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { if err != nil { return Error{ Err: err, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), EventVersion: event.Version(), EntityVersion: entityVersion, - Namespace: eh.NamespaceFromContext(ctx), } } @@ -177,9 +182,10 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { if newEntity.AggregateVersion() != event.Version() { return Error{ Err: eh.ErrIncorrectEntityVersion, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), EventVersion: event.Version(), EntityVersion: entityVersion, - Namespace: eh.NamespaceFromContext(ctx), } } } @@ -189,18 +195,20 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { if err := h.repo.Save(ctx, newEntity); err != nil { return Error{ Err: err, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), EventVersion: event.Version(), EntityVersion: entityVersion, - Namespace: eh.NamespaceFromContext(ctx), } } } else { if err := h.repo.Remove(ctx, event.AggregateID()); err != nil { return Error{ Err: err, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), EventVersion: event.Version(), EntityVersion: entityVersion, - Namespace: eh.NamespaceFromContext(ctx), } } } diff --git a/eventhandler/projector/eventhandler_test.go b/eventhandler/projector/eventhandler_test.go index eea0303e..4bbaf82f 100644 --- a/eventhandler/projector/eventhandler_test.go +++ b/eventhandler/projector/eventhandler_test.go @@ -131,7 +131,7 @@ func TestEventHandler_UpdateModelWithVersion(t *testing.T) { Content: "version 1", } if err := handler.HandleEvent(ctx, event); err != nil { - t.Error("there shoud be no error:", err) + t.Error("there should be no error:", err) } if projector.event != event { t.Error("the handled event should be correct:", projector.event) @@ -256,10 +256,12 @@ func TestEventHandler_LoadError(t *testing.T) { repo.LoadErr = loadErr expectedErr := Error{ Err: loadErr, - EventVersion: 1, + Projector: TestProjectorType.String(), Namespace: eh.NamespaceFromContext(ctx), + EventVersion: 1, } - if err := handler.HandleEvent(ctx, event); !reflect.DeepEqual(err, expectedErr) { + err := handler.HandleEvent(ctx, event) + if !errors.Is(err, expectedErr) { t.Error("there shoud be an error:", err) } } @@ -284,10 +286,12 @@ func TestEventHandler_SaveError(t *testing.T) { repo.SaveErr = saveErr expectedErr := Error{ Err: saveErr, - EventVersion: 1, + Projector: TestProjectorType.String(), Namespace: eh.NamespaceFromContext(ctx), + EventVersion: 1, } - if err := handler.HandleEvent(ctx, event); !reflect.DeepEqual(err, expectedErr) { + err := handler.HandleEvent(ctx, event) + if !errors.Is(err, expectedErr) { t.Error("there shoud be an error:", err) } } @@ -312,10 +316,12 @@ func TestEventHandler_ProjectError(t *testing.T) { projector.err = projectErr expectedErr := Error{ Err: projectErr, - EventVersion: 1, + Projector: TestProjectorType.String(), Namespace: eh.NamespaceFromContext(ctx), + EventVersion: 1, } - if err := handler.HandleEvent(ctx, event); !reflect.DeepEqual(err, expectedErr) { + err := handler.HandleEvent(ctx, event) + if !errors.Is(err, expectedErr) { t.Error("there shoud be an error:", err) } }