Skip to content

Commit

Permalink
Merge pull request #285 from maxekman/281/saga-errors
Browse files Browse the repository at this point in the history
281 / Saga command handling
  • Loading branch information
maxekman authored Jan 8, 2021
2 parents 1ec8957 + efedfc1 commit 84f91e9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 45 deletions.
48 changes: 28 additions & 20 deletions eventhandler/projector/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,40 @@ var _ = eh.EventHandler(&EventHandler{})

// Projector is a projector of events onto models.
type Projector interface {
// ProjectorType returns the type of the projector.
ProjectorType() Type

// Project projects an event onto a model and returns the updated model or
// an error.
Project(context.Context, eh.Event, eh.Entity) (eh.Entity, error)

// ProjectorType returns the type of the projector.
ProjectorType() Type
}

// Type is the type of a projector, used as its unique identifier.
type Type string

// String returns the string representation of a projector type.
func (t Type) String() string {
return string(t)
}

// Error is an error in the projector, with the namespace.
type Error struct {
// Err is the error that happened when projecting the event.
Err error
// Projector is the projector where the error happened.
Projector string
// Namespace is the namespace for the error.
Namespace string
// EventVersion is the version of the event.
EventVersion int
// EntityVersion is the version of the entity.
EntityVersion int
// BaseErr is an optional underlying error, for example from the DB driver.
BaseErr error
// Namespace is the namespace for the error.
Namespace string
}

// Error implements the Error method of the errors.Error interface.
func (e Error) Error() string {
errStr := e.Err.Error()
if e.BaseErr != nil {
errStr += ": " + e.BaseErr.Error()
}
return fmt.Sprintf("projector: %s, event: v%d, entity: v%d (%s)",
errStr, e.EventVersion, e.EntityVersion, e.Namespace)
return fmt.Sprintf("%s: %s, event: v%d, entity: v%d (%s)",
e.Projector, e.Err, e.EventVersion, e.EntityVersion, e.Namespace)
}

// Unwrap implements the errors.Unwrap method.
Expand Down Expand Up @@ -127,16 +128,18 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
if h.factoryFn == nil {
return Error{
Err: ErrModelNotSet,
EventVersion: event.Version(),
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}
entity = h.factoryFn()
} else if err != nil {
return Error{
Err: err,
EventVersion: event.Version(),
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}

Expand All @@ -153,9 +156,10 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
if entity.AggregateVersion()+1 != event.Version() {
return Error{
Err: eh.ErrIncorrectEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
Namespace: eh.NamespaceFromContext(ctx),
}
}
}
Expand All @@ -165,9 +169,10 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
if err != nil {
return Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
Namespace: eh.NamespaceFromContext(ctx),
}
}

Expand All @@ -177,9 +182,10 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
if newEntity.AggregateVersion() != event.Version() {
return Error{
Err: eh.ErrIncorrectEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
Namespace: eh.NamespaceFromContext(ctx),
}
}
}
Expand All @@ -189,18 +195,20 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
if err := h.repo.Save(ctx, newEntity); err != nil {
return Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
Namespace: eh.NamespaceFromContext(ctx),
}
}
} else {
if err := h.repo.Remove(ctx, event.AggregateID()); err != nil {
return Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
Namespace: eh.NamespaceFromContext(ctx),
}
}
}
Expand Down
20 changes: 13 additions & 7 deletions eventhandler/projector/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestEventHandler_UpdateModelWithVersion(t *testing.T) {
Content: "version 1",
}
if err := handler.HandleEvent(ctx, event); err != nil {
t.Error("there shoud be no error:", err)
t.Error("there should be no error:", err)
}
if projector.event != event {
t.Error("the handled event should be correct:", projector.event)
Expand Down Expand Up @@ -256,10 +256,12 @@ func TestEventHandler_LoadError(t *testing.T) {
repo.LoadErr = loadErr
expectedErr := Error{
Err: loadErr,
EventVersion: 1,
Projector: TestProjectorType.String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: 1,
}
if err := handler.HandleEvent(ctx, event); !reflect.DeepEqual(err, expectedErr) {
err := handler.HandleEvent(ctx, event)
if !errors.Is(err, expectedErr) {
t.Error("there shoud be an error:", err)
}
}
Expand All @@ -284,10 +286,12 @@ func TestEventHandler_SaveError(t *testing.T) {
repo.SaveErr = saveErr
expectedErr := Error{
Err: saveErr,
EventVersion: 1,
Projector: TestProjectorType.String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: 1,
}
if err := handler.HandleEvent(ctx, event); !reflect.DeepEqual(err, expectedErr) {
err := handler.HandleEvent(ctx, event)
if !errors.Is(err, expectedErr) {
t.Error("there shoud be an error:", err)
}
}
Expand All @@ -312,10 +316,12 @@ func TestEventHandler_ProjectError(t *testing.T) {
projector.err = projectErr
expectedErr := Error{
Err: projectErr,
EventVersion: 1,
Projector: TestProjectorType.String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: 1,
}
if err := handler.HandleEvent(ctx, event); !reflect.DeepEqual(err, expectedErr) {
err := handler.HandleEvent(ctx, event)
if !errors.Is(err, expectedErr) {
t.Error("there shoud be an error:", err)
}
}
Expand Down
46 changes: 35 additions & 11 deletions eventhandler/saga/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package saga

import (
"context"
"errors"
"fmt"

eh "github.com/looplab/eventhorizon"
)
Expand All @@ -37,7 +37,8 @@ type Saga interface {
SagaType() Type

// RunSaga handles an event in the saga that can return commands.
RunSaga(context.Context, eh.Event) []eh.Command
// If an error is returned from the saga, the event will be run again.
RunSaga(context.Context, eh.Event, eh.CommandHandler) error
}

// Type is the type of a saga, used as its unique identifier.
Expand All @@ -48,6 +49,32 @@ func (t Type) String() string {
return string(t)
}

// Error is an error in the projector, with the namespace.
type Error struct {
// Err is the error that happened when projecting the event.
Err error
// Saga is the saga where the error happened.
Saga string
// Namespace is the namespace for the error.
Namespace string
}

// Error implements the Error method of the errors.Error interface.
func (e Error) Error() string {
return fmt.Sprintf("%s: %s (%s)",
e.Saga, e.Err, e.Namespace)
}

// Unwrap implements the errors.Unwrap method.
func (e Error) Unwrap() error {
return e.Err
}

// Cause implements the github.com/pkg/errors Unwrap method.
func (e Error) Cause() error {
return e.Unwrap()
}

// NewEventHandler creates a new EventHandler.
func NewEventHandler(saga Saga, commandHandler eh.CommandHandler) *EventHandler {
return &EventHandler{
Expand All @@ -63,15 +90,12 @@ func (h *EventHandler) HandlerType() eh.EventHandlerType {

// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
// Run the saga and collect commands.
cmds := h.saga.RunSaga(ctx, event)

// Dispatch commands back on the command bus.
for _, cmd := range cmds {
if err := h.commandHandler.HandleCommand(ctx, cmd); err != nil {
return errors.New("could not handle command '" +
cmd.CommandType().String() + "' from saga '" +
h.saga.SagaType().String() + "': " + err.Error())
// Run the saga which can issue commands on the provided command handler.
if err := h.saga.RunSaga(ctx, event, h.commandHandler); err != nil {
return Error{
Err: err,
Saga: h.saga.SagaType().String(),
Namespace: eh.NamespaceFromContext(ctx),
}
}

Expand Down
7 changes: 5 additions & 2 deletions eventhandler/saga/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ func (m *TestSaga) SagaType() Type {
return TestSagaType
}

func (m *TestSaga) RunSaga(ctx context.Context, event eh.Event) []eh.Command {
func (m *TestSaga) RunSaga(ctx context.Context, event eh.Event, h eh.CommandHandler) error {
m.event = event
m.context = ctx
return m.commands
for _, cmd := range m.commands {
return h.HandleCommand(ctx, cmd)
}
return nil
}
10 changes: 5 additions & 5 deletions examples/guestlist/domains/guestlist/saga.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *ResponseSaga) SagaType() saga.Type {
}

// RunSaga implements the Run saga method of the Saga interface.
func (s *ResponseSaga) RunSaga(ctx context.Context, event eh.Event) []eh.Command {
func (s *ResponseSaga) RunSaga(ctx context.Context, event eh.Event, h eh.CommandHandler) error {
switch event.EventType() {
case InviteAcceptedEvent:
// Do nothing for already accepted guests.
Expand All @@ -61,19 +61,19 @@ func (s *ResponseSaga) RunSaga(ctx context.Context, event eh.Event) []eh.Command

// Deny the invite if the guest list is full.
if len(s.acceptedGuests) >= s.guestLimit {
return []eh.Command{
return h.HandleCommand(ctx,
&DenyInvite{ID: event.AggregateID()},
}
)
}

// Confirm the invite when there is space left.
s.acceptedGuestsMu.Lock()
s.acceptedGuests[event.AggregateID()] = true
s.acceptedGuestsMu.Unlock()

return []eh.Command{
return h.HandleCommand(ctx,
&ConfirmInvite{ID: event.AggregateID()},
}
)
}

return nil
Expand Down

0 comments on commit 84f91e9

Please sign in to comment.