Skip to content

Commit

Permalink
Switch to the official mongo driver
Browse files Browse the repository at this point in the history
Also fixes #249 by adding a type encoder/decoder for uuid.UUID
  • Loading branch information
maxekman committed Mar 31, 2020
1 parent 44258bd commit 87ee002
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 276 deletions.
2 changes: 1 addition & 1 deletion eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
ctx := mocks.WithContextOne(context.Background(), "testval")

// Without handler.
id, _ := uuid.Parse("c1138e5f-f6fb-4dd0-8e79-255c6c8d3756")
id := uuid.New()
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp,
mocks.AggregateType, id, 1)
Expand Down
56 changes: 27 additions & 29 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"time"

"cloud.google.com/go/pubsub"
"github.com/globalsign/mgo/bson"
"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"
"google.golang.org/api/option"

// Register uuid.UUID as BSON type.
_ "github.com/looplab/eventhorizon/types/mongodb"

eh "github.com/looplab/eventhorizon"
)

Expand Down Expand Up @@ -84,11 +87,10 @@ func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error {

// Marshal event data if there is any.
if event.Data() != nil {
rawData, err := bson.Marshal(event.Data())
if err != nil {
var err error
if e.RawData, err = bson.Marshal(event.Data()); err != nil {
return errors.New("could not marshal event data: " + err.Error())
}
e.RawData = bson.Raw{Kind: 3, Data: rawData}
}

// Marshal the event (using BSON for now).
Expand All @@ -97,13 +99,10 @@ func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error {
return errors.New("could not marshal event: " + err.Error())
}

// NOTE: Using a new context here.
// TODO: Why?
publishCtx := context.Background()
res := b.topic.Publish(publishCtx, &pubsub.Message{
res := b.topic.Publish(ctx, &pubsub.Message{
Data: data,
})
if _, err := res.Get(publishCtx); err != nil {
if _, err := res.Get(ctx); err != nil {
return errors.New("could not publish event: " + err.Error())
}

Expand Down Expand Up @@ -185,13 +184,9 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, sub *pubsub.Subs

func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx context.Context, msg *pubsub.Message) {
return func(ctx context.Context, msg *pubsub.Message) {
// Manually decode the raw BSON event.
data := bson.Raw{
Kind: 3,
Data: msg.Data,
}
// Decode the raw BSON event data.
var e evt
if err := data.Unmarshal(&e); err != nil {
if err := bson.Unmarshal(msg.Data, &e); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event: " + err.Error()), Ctx: ctx}:
default:
Expand All @@ -200,22 +195,25 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
return
}

// Create an event of the correct type.
if data, err := eh.CreateEventData(e.EventType); err == nil {
// Manually decode the raw BSON event.
if err := e.RawData.Unmarshal(data); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event data: " + err.Error()), Ctx: ctx}:
default:
}
msg.Nack()
return
// Create an event of the correct type and decode from raw BSON.
var err error
if e.data, err = eh.CreateEventData(e.EventType); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not create event data: " + err.Error()), Ctx: ctx}:
default:
}

// Set concrete event and zero out the decoded event.
e.data = data
e.RawData = bson.Raw{}
msg.Nack()
return
}
if err := bson.Unmarshal(e.RawData, e.data); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event data: " + err.Error()), Ctx: ctx}:
default:
}
msg.Nack()
return
}
e.RawData = nil

event := event{evt: e}
ctx = eh.UnmarshalContext(e.Context)
Expand Down
38 changes: 19 additions & 19 deletions eventstore/acceptanece_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh

ctx = context.WithValue(ctx, "testkey", "testval")

t.Log("save no events")
// Save no events.
err := store.Save(ctx, []eh.Event{}, 0)
if esErr, ok := err.(eh.EventStoreError); !ok || esErr.Err != eh.ErrNoEventsToAppend {
t.Error("there shoud be a ErrNoEventsToAppend error:", err)
}

t.Log("save event, version 1")
id, _ := uuid.Parse("c1138e5f-f6fb-4dd0-8e79-255c6c8d3756")
// Save event, version 1.
id := uuid.New()
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"},
timestamp, mocks.AggregateType, id, 1)
Expand All @@ -60,13 +60,13 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
// t.Error("the context should be correct:", agg.Context)
// }

t.Log("try to save same event twice")
// Try to save same event twice.
err = store.Save(ctx, []eh.Event{event1}, 1)
if esErr, ok := err.(eh.EventStoreError); !ok || esErr.Err != eh.ErrIncorrectEventVersion {
t.Error("there should be a ErrIncerrectEventVersion error:", err)
}

t.Log("save event, version 2")
// Save event, version 2.
event2 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event2"},
timestamp, mocks.AggregateType, id, 2)
err = store.Save(ctx, []eh.Event{event2}, 1)
Expand All @@ -75,7 +75,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
}
savedEvents = append(savedEvents, event2)

t.Log("save event without data, version 3")
// Save event without data, version 3.
event3 := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp,
mocks.AggregateType, id, 3)
err = store.Save(ctx, []eh.Event{event3}, 2)
Expand All @@ -84,7 +84,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
}
savedEvents = append(savedEvents, event3)

t.Log("save multiple events, version 4, 5 and 6")
// Save multiple events, version 4,5 and 6.
event4 := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp,
mocks.AggregateType, id, 4)
event5 := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp,
Expand All @@ -97,8 +97,8 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
}
savedEvents = append(savedEvents, event4, event5, event6)

t.Log("save event for another aggregate")
id2, _ := uuid.Parse("c1138e5e-f6fb-4dd0-8e79-255c6c8d3756")
// Save event for another aggregate.
id2 := uuid.New()
event7 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event7"},
timestamp, mocks.AggregateType, id2, 1)
err = store.Save(ctx, []eh.Event{event7}, 0)
Expand All @@ -107,7 +107,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
}
savedEvents = append(savedEvents, event7)

t.Log("load events for non-existing aggregate")
// Load events for non-existing aggregate.
events, err := store.Load(ctx, uuid.New())
if err != nil {
t.Error("there should be no error:", err)
Expand All @@ -116,7 +116,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
t.Error("there should be no loaded events:", eventsToString(events))
}

t.Log("load events")
// Load events.
events, err = store.Load(ctx, id)
if err != nil {
t.Error("there should be no error:", err)
Expand All @@ -136,7 +136,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
}
}

t.Log("load events for another aggregate")
// Load events for another aggregate.
events, err = store.Load(ctx, id2)
if err != nil {
t.Error("there should be no error:", err)
Expand Down Expand Up @@ -167,8 +167,8 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStoreMaintainer) {
ctx = context.WithValue(ctx, "testkey", "testval")

t.Log("save some events")
id, _ := uuid.Parse("c1138e5f-f6fb-4dd0-8e79-255c6c8d3757")
// Save some events.
id := uuid.New()
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"},
timestamp, mocks.AggregateType, id, 1)
Expand All @@ -180,21 +180,21 @@ func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventS
t.Error("there should be no error:", err)
}

t.Log("replace event, no aggregate")
// Replace event, no aggregate.
eventWithoutAggregate := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event"},
timestamp, mocks.AggregateType, uuid.New(), 1)
if err := store.Replace(ctx, eventWithoutAggregate); err != eh.ErrAggregateNotFound {
t.Error("there should be an aggregate not found error:", err)
}

t.Log("replace event, no event version")
// Replace event, no event version.
eventWithoutVersion := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event20"},
timestamp, mocks.AggregateType, id, 20)
if err := store.Replace(ctx, eventWithoutVersion); err != eh.ErrInvalidEvent {
t.Error("there should be an invalid event error:", err)
}

t.Log("replace event")
// Replace event.
event2Mod := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event2_mod"},
timestamp, mocks.AggregateType, id, 2)
if err := store.Replace(ctx, event2Mod); err != nil {
Expand All @@ -218,7 +218,7 @@ func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventS
}
}

t.Log("save events of the old type")
// Save events of the old type.
oldEventType := eh.EventType("old_event_type")
id1 := uuid.New()
oldEvent1 := eh.NewEventForAggregate(oldEventType, nil, timestamp,
Expand All @@ -233,7 +233,7 @@ func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventS
t.Error("there should be no error:", err)
}

t.Log("rename events to the new type")
// Rename events to the new type.
newEventType := eh.EventType("new_event_type")
if err := store.RenameEvent(ctx, oldEventType, newEventType); err != nil {
t.Error("there should be no error:", err)
Expand Down
8 changes: 1 addition & 7 deletions eventstore/memory/eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,9 @@ func TestEventStore(t *testing.T) {
t.Fatal("there should be a store")
}

// Run the actual test suite.

t.Log("event store with default namespace")
// Run the actual test suite, both for default and custom namespace.
eventstore.AcceptanceTest(t, context.Background(), store)

t.Log("event store with other namespace")
ctx := eh.NewContextWithNamespace(context.Background(), "ns")
eventstore.AcceptanceTest(t, ctx, store)

t.Log("event store maintainer")
eventstore.MaintainerAcceptanceTest(t, context.Background(), store)
}
Loading

0 comments on commit 87ee002

Please sign in to comment.