diff --git a/eventbus/acceptance_testing.go b/eventbus/acceptance_testing.go index ee30d807..9deb0f1e 100644 --- a/eventbus/acceptance_testing.go +++ b/eventbus/acceptance_testing.go @@ -72,7 +72,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration) ctx := mocks.WithContextOne(context.Background(), "testval") // Without handler. - id, _ := uuid.Parse("c1138e5f-f6fb-4dd0-8e79-255c6c8d3756") + id := uuid.New() timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp, mocks.AggregateType, id, 1) diff --git a/eventbus/gcp/eventbus.go b/eventbus/gcp/eventbus.go index 0dbdc561..7bd1f046 100644 --- a/eventbus/gcp/eventbus.go +++ b/eventbus/gcp/eventbus.go @@ -22,10 +22,13 @@ import ( "time" "cloud.google.com/go/pubsub" - "github.com/globalsign/mgo/bson" "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" "google.golang.org/api/option" + // Register uuid.UUID as BSON type. + _ "github.com/looplab/eventhorizon/types/mongodb" + eh "github.com/looplab/eventhorizon" ) @@ -84,11 +87,10 @@ func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error { // Marshal event data if there is any. if event.Data() != nil { - rawData, err := bson.Marshal(event.Data()) - if err != nil { + var err error + if e.RawData, err = bson.Marshal(event.Data()); err != nil { return errors.New("could not marshal event data: " + err.Error()) } - e.RawData = bson.Raw{Kind: 3, Data: rawData} } // Marshal the event (using BSON for now). @@ -97,13 +99,10 @@ func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error { return errors.New("could not marshal event: " + err.Error()) } - // NOTE: Using a new context here. - // TODO: Why? - publishCtx := context.Background() - res := b.topic.Publish(publishCtx, &pubsub.Message{ + res := b.topic.Publish(ctx, &pubsub.Message{ Data: data, }) - if _, err := res.Get(publishCtx); err != nil { + if _, err := res.Get(ctx); err != nil { return errors.New("could not publish event: " + err.Error()) } @@ -185,13 +184,9 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, sub *pubsub.Subs func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx context.Context, msg *pubsub.Message) { return func(ctx context.Context, msg *pubsub.Message) { - // Manually decode the raw BSON event. - data := bson.Raw{ - Kind: 3, - Data: msg.Data, - } + // Decode the raw BSON event data. var e evt - if err := data.Unmarshal(&e); err != nil { + if err := bson.Unmarshal(msg.Data, &e); err != nil { select { case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event: " + err.Error()), Ctx: ctx}: default: @@ -200,22 +195,25 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex return } - // Create an event of the correct type. - if data, err := eh.CreateEventData(e.EventType); err == nil { - // Manually decode the raw BSON event. - if err := e.RawData.Unmarshal(data); err != nil { - select { - case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event data: " + err.Error()), Ctx: ctx}: - default: - } - msg.Nack() - return + // Create an event of the correct type and decode from raw BSON. + var err error + if e.data, err = eh.CreateEventData(e.EventType); err != nil { + select { + case b.errCh <- eh.EventBusError{Err: errors.New("could not create event data: " + err.Error()), Ctx: ctx}: + default: } - - // Set concrete event and zero out the decoded event. - e.data = data - e.RawData = bson.Raw{} + msg.Nack() + return + } + if err := bson.Unmarshal(e.RawData, e.data); err != nil { + select { + case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event data: " + err.Error()), Ctx: ctx}: + default: + } + msg.Nack() + return } + e.RawData = nil event := event{evt: e} ctx = eh.UnmarshalContext(e.Context) diff --git a/eventstore/acceptanece_testing.go b/eventstore/acceptanece_testing.go index 2377ad9d..9791daea 100644 --- a/eventstore/acceptanece_testing.go +++ b/eventstore/acceptanece_testing.go @@ -40,14 +40,14 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh ctx = context.WithValue(ctx, "testkey", "testval") - t.Log("save no events") + // Save no events. err := store.Save(ctx, []eh.Event{}, 0) if esErr, ok := err.(eh.EventStoreError); !ok || esErr.Err != eh.ErrNoEventsToAppend { t.Error("there shoud be a ErrNoEventsToAppend error:", err) } - t.Log("save event, version 1") - id, _ := uuid.Parse("c1138e5f-f6fb-4dd0-8e79-255c6c8d3756") + // Save event, version 1. + id := uuid.New() timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp, mocks.AggregateType, id, 1) @@ -60,13 +60,13 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh // t.Error("the context should be correct:", agg.Context) // } - t.Log("try to save same event twice") + // Try to save same event twice. err = store.Save(ctx, []eh.Event{event1}, 1) if esErr, ok := err.(eh.EventStoreError); !ok || esErr.Err != eh.ErrIncorrectEventVersion { t.Error("there should be a ErrIncerrectEventVersion error:", err) } - t.Log("save event, version 2") + // Save event, version 2. event2 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event2"}, timestamp, mocks.AggregateType, id, 2) err = store.Save(ctx, []eh.Event{event2}, 1) @@ -75,7 +75,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh } savedEvents = append(savedEvents, event2) - t.Log("save event without data, version 3") + // Save event without data, version 3. event3 := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp, mocks.AggregateType, id, 3) err = store.Save(ctx, []eh.Event{event3}, 2) @@ -84,7 +84,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh } savedEvents = append(savedEvents, event3) - t.Log("save multiple events, version 4, 5 and 6") + // Save multiple events, version 4,5 and 6. event4 := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp, mocks.AggregateType, id, 4) event5 := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp, @@ -97,8 +97,8 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh } savedEvents = append(savedEvents, event4, event5, event6) - t.Log("save event for another aggregate") - id2, _ := uuid.Parse("c1138e5e-f6fb-4dd0-8e79-255c6c8d3756") + // Save event for another aggregate. + id2 := uuid.New() event7 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event7"}, timestamp, mocks.AggregateType, id2, 1) err = store.Save(ctx, []eh.Event{event7}, 0) @@ -107,7 +107,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh } savedEvents = append(savedEvents, event7) - t.Log("load events for non-existing aggregate") + // Load events for non-existing aggregate. events, err := store.Load(ctx, uuid.New()) if err != nil { t.Error("there should be no error:", err) @@ -116,7 +116,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh t.Error("there should be no loaded events:", eventsToString(events)) } - t.Log("load events") + // Load events. events, err = store.Load(ctx, id) if err != nil { t.Error("there should be no error:", err) @@ -136,7 +136,7 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh } } - t.Log("load events for another aggregate") + // Load events for another aggregate. events, err = store.Load(ctx, id2) if err != nil { t.Error("there should be no error:", err) @@ -167,8 +167,8 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStoreMaintainer) { ctx = context.WithValue(ctx, "testkey", "testval") - t.Log("save some events") - id, _ := uuid.Parse("c1138e5f-f6fb-4dd0-8e79-255c6c8d3757") + // Save some events. + id := uuid.New() timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp, mocks.AggregateType, id, 1) @@ -180,21 +180,21 @@ func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventS t.Error("there should be no error:", err) } - t.Log("replace event, no aggregate") + // Replace event, no aggregate. eventWithoutAggregate := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event"}, timestamp, mocks.AggregateType, uuid.New(), 1) if err := store.Replace(ctx, eventWithoutAggregate); err != eh.ErrAggregateNotFound { t.Error("there should be an aggregate not found error:", err) } - t.Log("replace event, no event version") + // Replace event, no event version. eventWithoutVersion := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event20"}, timestamp, mocks.AggregateType, id, 20) if err := store.Replace(ctx, eventWithoutVersion); err != eh.ErrInvalidEvent { t.Error("there should be an invalid event error:", err) } - t.Log("replace event") + // Replace event. event2Mod := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event2_mod"}, timestamp, mocks.AggregateType, id, 2) if err := store.Replace(ctx, event2Mod); err != nil { @@ -218,7 +218,7 @@ func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventS } } - t.Log("save events of the old type") + // Save events of the old type. oldEventType := eh.EventType("old_event_type") id1 := uuid.New() oldEvent1 := eh.NewEventForAggregate(oldEventType, nil, timestamp, @@ -233,7 +233,7 @@ func MaintainerAcceptanceTest(t *testing.T, ctx context.Context, store eh.EventS t.Error("there should be no error:", err) } - t.Log("rename events to the new type") + // Rename events to the new type. newEventType := eh.EventType("new_event_type") if err := store.RenameEvent(ctx, oldEventType, newEventType); err != nil { t.Error("there should be no error:", err) diff --git a/eventstore/memory/eventstore_test.go b/eventstore/memory/eventstore_test.go index f8bca998..3a0a8074 100644 --- a/eventstore/memory/eventstore_test.go +++ b/eventstore/memory/eventstore_test.go @@ -28,15 +28,9 @@ func TestEventStore(t *testing.T) { t.Fatal("there should be a store") } - // Run the actual test suite. - - t.Log("event store with default namespace") + // Run the actual test suite, both for default and custom namespace. eventstore.AcceptanceTest(t, context.Background(), store) - - t.Log("event store with other namespace") ctx := eh.NewContextWithNamespace(context.Background(), "ns") eventstore.AcceptanceTest(t, ctx, store) - - t.Log("event store maintainer") eventstore.MaintainerAcceptanceTest(t, context.Background(), store) } diff --git a/eventstore/mongodb/eventstore.go b/eventstore/mongodb/eventstore.go index 5d92e87b..48be0e2c 100644 --- a/eventstore/mongodb/eventstore.go +++ b/eventstore/mongodb/eventstore.go @@ -20,9 +20,16 @@ import ( "fmt" "time" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/mongo/writeconcern" + + // Register uuid.UUID as BSON type. + _ "github.com/looplab/eventhorizon/types/mongodb" eh "github.com/looplab/eventhorizon" ) @@ -30,8 +37,8 @@ import ( // ErrCouldNotDialDB is when the database could not be dialed. var ErrCouldNotDialDB = errors.New("could not dial database") -// ErrNoDBSession is when no database session is set. -var ErrNoDBSession = errors.New("no database session") +// ErrNoDBClient is when no database client is set. +var ErrNoDBClient = errors.New("no database client") // ErrCouldNotClearDB is when the database could not be cleared. var ErrCouldNotClearDB = errors.New("could not clear database") @@ -50,31 +57,32 @@ var ErrCouldNotSaveAggregate = errors.New("could not save aggregate") // EventStore implements an EventStore for MongoDB. type EventStore struct { - session *mgo.Session + client *mongo.Client dbPrefix string } -// NewEventStore creates a new EventStore. -func NewEventStore(url, dbPrefix string) (*EventStore, error) { - session, err := mgo.Dial(url) +// NewEventStore creates a new EventStore with a MongoDB URI: `mongodb://hostname`. +func NewEventStore(uri, dbPrefix string) (*EventStore, error) { + opts := options.Client().ApplyURI(uri) + opts.SetWriteConcern(writeconcern.New(writeconcern.WMajority())) + opts.SetReadConcern(readconcern.Majority()) + opts.SetReadPreference(readpref.Primary()) + client, err := mongo.Connect(context.TODO(), opts) if err != nil { return nil, ErrCouldNotDialDB } - session.SetMode(mgo.Strong, true) - session.SetSafe(&mgo.Safe{W: 1}) - - return NewEventStoreWithSession(session, dbPrefix) + return NewEventStoreWithClient(client, dbPrefix) } -// NewEventStoreWithSession creates a new EventStore with a session. -func NewEventStoreWithSession(session *mgo.Session, dbPrefix string) (*EventStore, error) { - if session == nil { - return nil, ErrNoDBSession +// NewEventStoreWithClient creates a new EventStore with a client. +func NewEventStoreWithClient(client *mongo.Client, dbPrefix string) (*EventStore, error) { + if client == nil { + return nil, ErrNoDBClient } s := &EventStore{ - session: session, + client: client, dbPrefix: dbPrefix, } @@ -90,9 +98,6 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio } } - sess := s.session.Copy() - defer sess.Close() - // Build all event records, with incrementing versions starting from the // original aggregate version. dbEvents := make([]dbEvent, len(events)) @@ -124,18 +129,20 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio version++ } + c := s.client.Database(s.dbName(ctx)).Collection("events") + // Either insert a new aggregate or append to an existing. if originalVersion == 0 { aggregate := aggregateRecord{ - AggregateID: aggregateID.String(), + AggregateID: aggregateID, Version: len(dbEvents), Events: dbEvents, } - if err := sess.DB(s.dbName(ctx)).C("events").Insert(aggregate); err != nil { + if _, err := c.InsertOne(ctx, aggregate); err != nil { return eh.EventStoreError{ - BaseErr: err, Err: ErrCouldNotSaveAggregate, + BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } } @@ -143,9 +150,9 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Increment aggregate version on insert of new event record, and // only insert if version of aggregate is matching (ie not changed // since loading the aggregate). - if err := sess.DB(s.dbName(ctx)).C("events").Update( + if _, err := c.UpdateOne(ctx, bson.M{ - "_id": aggregateID.String(), + "_id": aggregateID, "version": originalVersion, }, bson.M{ @@ -154,8 +161,8 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio }, ); err != nil { return eh.EventStoreError{ - BaseErr: err, Err: ErrCouldNotSaveAggregate, + BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } } @@ -166,16 +173,14 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Load implements the Load method of the eventhorizon.EventStore interface. func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) { - sess := s.session.Copy() - defer sess.Close() + c := s.client.Database(s.dbName(ctx)).Collection("events") var aggregate aggregateRecord - err := sess.DB(s.dbName(ctx)).C("events").FindId(id.String()).One(&aggregate) - if err == mgo.ErrNotFound { + err := c.FindOne(ctx, bson.M{"_id": id}).Decode(&aggregate) + if err == mongo.ErrNoDocuments { return []eh.Event{}, nil } else if err != nil { return nil, eh.EventStoreError{ - BaseErr: err, Err: err, Namespace: eh.NamespaceFromContext(ctx), } @@ -183,20 +188,24 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) events := make([]eh.Event, len(aggregate.Events)) for i, dbEvent := range aggregate.Events { - // Create an event of the correct type. - if data, err := eh.CreateEventData(dbEvent.EventType); err == nil { - // Manually decode the raw BSON event. - if err := dbEvent.RawData.Unmarshal(data); err != nil { + // Create an event of the correct type and decode from raw BSON. + if len(dbEvent.RawData) > 0 { + var err error + if dbEvent.data, err = eh.CreateEventData(dbEvent.EventType); err != nil { return nil, eh.EventStoreError{ + Err: ErrCouldNotUnmarshalEvent, BaseErr: err, + Namespace: eh.NamespaceFromContext(ctx), + } + } + if err := bson.Unmarshal(dbEvent.RawData, dbEvent.data); err != nil { + return nil, eh.EventStoreError{ Err: ErrCouldNotUnmarshalEvent, + BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } } - - // Set conrcete event and zero out the decoded event. - dbEvent.data = data - dbEvent.RawData = bson.Raw{} + dbEvent.RawData = nil } events[i] = event{dbEvent: dbEvent} @@ -207,46 +216,42 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) // Replace implements the Replace method of the eventhorizon.EventStore interface. func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { - sess := s.session.Copy() - defer sess.Close() + c := s.client.Database(s.dbName(ctx)).Collection("events") // First check if the aggregate exists, the not found error in the update // query can mean both that the aggregate or the event is not found. - n, err := sess.DB(s.dbName(ctx)).C("events").FindId(event.AggregateID().String()).Count() - if n == 0 { + if n, err := c.CountDocuments(ctx, bson.M{"_id": event.AggregateID()}); n == 0 { return eh.ErrAggregateNotFound } else if err != nil { return eh.EventStoreError{ - BaseErr: err, Err: err, Namespace: eh.NamespaceFromContext(ctx), } } - // Create the event record for the DB. + // Create the event record for the Database. e, err := newDBEvent(ctx, event) if err != nil { return err } // Find and replace the event. - err = sess.DB(s.dbName(ctx)).C("events").Update( + if r, err := c.UpdateOne(ctx, bson.M{ - "_id": event.AggregateID().String(), + "_id": event.AggregateID(), "events.version": event.Version(), }, bson.M{ "$set": bson.M{"events.$": *e}, }, - ) - if err == mgo.ErrNotFound { - return eh.ErrInvalidEvent - } else if err != nil { + ); err != nil { return eh.EventStoreError{ - BaseErr: err, Err: ErrCouldNotSaveAggregate, + BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } + } else if r.MatchedCount == 0 { + return eh.ErrInvalidEvent } return nil @@ -254,12 +259,11 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { // RenameEvent implements the RenameEvent method of the eventhorizon.EventStore interface. func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) error { - sess := s.session.Copy() - defer sess.Close() + c := s.client.Database(s.dbName(ctx)).Collection("events") // Find and rename all events. // TODO: Maybe use change info. - if _, err := sess.DB(s.dbName(ctx)).C("events").UpdateAll( + if _, err := c.UpdateMany(ctx, bson.M{ "events.event_type": string(from), }, @@ -268,8 +272,8 @@ func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) err }, ); err != nil { return eh.EventStoreError{ - BaseErr: err, Err: ErrCouldNotSaveAggregate, + BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } } @@ -279,31 +283,33 @@ func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) err // Clear clears the event storage. func (s *EventStore) Clear(ctx context.Context) error { - if err := s.session.DB(s.dbName(ctx)).C("events").DropCollection(); err != nil { + c := s.client.Database(s.dbName(ctx)).Collection("events") + + if err := c.Drop(ctx); err != nil { return eh.EventStoreError{ - BaseErr: err, Err: ErrCouldNotClearDB, + BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } } return nil } -// Close closes the database session. -func (s *EventStore) Close() { - s.session.Close() +// Close closes the database client. +func (s *EventStore) Close(ctx context.Context) { + s.client.Disconnect(ctx) } -// dbName appends the namespace, if one is set, to the DB prefix to -// get the name of the DB to use. +// dbName appends the namespace, if one is set, to the Database prefix to +// get the name of the Database to use. func (s *EventStore) dbName(ctx context.Context) string { ns := eh.NamespaceFromContext(ctx) return s.dbPrefix + "_" + ns } -// aggregateRecord is the DB representation of an aggregate. +// aggregateRecord is the Database representation of an aggregate. type aggregateRecord struct { - AggregateID string `bson:"_id"` + AggregateID uuid.UUID `bson:"_id"` Version int `bson:"version"` Events []dbEvent `bson:"events"` // Type string `bson:"type"` @@ -318,34 +324,34 @@ type dbEvent struct { data eh.EventData `bson:"-"` Timestamp time.Time `bson:"timestamp"` AggregateType eh.AggregateType `bson:"aggregate_type"` - AggregateID string `bson:"_id"` + AggregateID uuid.UUID `bson:"_id"` Version int `bson:"version"` } // newDBEvent returns a new dbEvent for an event. func newDBEvent(ctx context.Context, event eh.Event) (*dbEvent, error) { + e := &dbEvent{ + EventType: event.EventType(), + Timestamp: event.Timestamp(), + AggregateType: event.AggregateType(), + AggregateID: event.AggregateID(), + Version: event.Version(), + } + // Marshal event data if there is any. - var rawData bson.Raw if event.Data() != nil { - raw, err := bson.Marshal(event.Data()) + var err error + e.RawData, err = bson.Marshal(event.Data()) if err != nil { return nil, eh.EventStoreError{ - BaseErr: err, Err: ErrCouldNotMarshalEvent, + BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } } - rawData = bson.Raw{Kind: 3, Data: raw} } - return &dbEvent{ - EventType: event.EventType(), - RawData: rawData, - Timestamp: event.Timestamp(), - AggregateType: event.AggregateType(), - AggregateID: event.AggregateID().String(), - Version: event.Version(), - }, nil + return e, nil } // event is the private implementation of the eventhorizon.Event interface @@ -356,11 +362,7 @@ type event struct { // AggrgateID implements the AggrgateID method of the eventhorizon.Event interface. func (e event) AggregateID() uuid.UUID { - id, err := uuid.Parse(e.dbEvent.AggregateID) - if err != nil { - return uuid.Nil - } - return id + return e.dbEvent.AggregateID } // AggregateType implements the AggregateType method of the eventhorizon.Event interface. diff --git a/eventstore/mongodb/eventstore_test.go b/eventstore/mongodb/eventstore_test.go index 2c3c62e2..184992b7 100644 --- a/eventstore/mongodb/eventstore_test.go +++ b/eventstore/mongodb/eventstore_test.go @@ -24,13 +24,12 @@ import ( ) func TestEventStore(t *testing.T) { - // Local Mongo testing with Docker + // Use MongoDB in Docker with fallback to localhost. url := os.Getenv("MONGO_HOST") - if url == "" { - // Default to localhost url = "localhost:27017" } + url = "mongodb://" + url store, err := NewEventStore(url, "test") if err != nil { @@ -40,27 +39,20 @@ func TestEventStore(t *testing.T) { t.Fatal("there should be a store") } - ctx := eh.NewContextWithNamespace(context.Background(), "ns") + customNamespaceCtx := eh.NewContextWithNamespace(context.Background(), "ns") - defer store.Close() + defer store.Close(context.Background()) defer func() { - t.Log("clearing db") if err = store.Clear(context.Background()); err != nil { t.Fatal("there should be no error:", err) } - if err = store.Clear(ctx); err != nil { + if err = store.Clear(customNamespaceCtx); err != nil { t.Fatal("there should be no error:", err) } }() - // Run the actual test suite. - - t.Log("event store with default namespace") + // Run the actual test suite, both for default and custom namespace. eventstore.AcceptanceTest(t, context.Background(), store) - - t.Log("event store with other namespace") - eventstore.AcceptanceTest(t, ctx, store) - - t.Log("event store maintainer") + eventstore.AcceptanceTest(t, customNamespaceCtx, store) eventstore.MaintainerAcceptanceTest(t, context.Background(), store) } diff --git a/eventstore/trace/eventstore_test.go b/eventstore/trace/eventstore_test.go index 603891f5..b5cebd63 100644 --- a/eventstore/trace/eventstore_test.go +++ b/eventstore/trace/eventstore_test.go @@ -33,7 +33,7 @@ func TestEventStore(t *testing.T) { t.Fatal("there should be a store") } - // Run the actual test suite. + // Run the actual test suite, with tracing enabled. store.StartTracing() savedEvents := eventstore.AcceptanceTest(t, context.Background(), store) store.StopTracing() @@ -61,7 +61,7 @@ func TestEventStore(t *testing.T) { ctx := context.Background() - t.Log("save event, version 7") + // Save event, version 7. timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) event7 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp, mocks.AggregateType, event1.AggregateID(), 7) @@ -75,7 +75,7 @@ func TestEventStore(t *testing.T) { t.Error("there should be no events traced:", trace) } - t.Log("load events without tracing") + // Load events without tracing. events, err := store.Load(ctx, event1.AggregateID()) if err != nil { t.Error("there should be no error:", err) diff --git a/examples/guestlist/mongodb/mongodb_test.go b/examples/guestlist/mongodb/mongodb_test.go index 81ab1774..bbbaba70 100644 --- a/examples/guestlist/mongodb/mongodb_test.go +++ b/examples/guestlist/mongodb/mongodb_test.go @@ -34,13 +34,12 @@ import ( ) func Example() { - // Local Mongo testing with Docker + // Use MongoDB in Docker with fallback to localhost. url := os.Getenv("MONGO_HOST") - if url == "" { - // Default to localhost url = "localhost:27017" } + url = "mongodb://" + url // Create the event store. eventStore, err := eventstore.NewEventStore(url, "demo") diff --git a/examples/todomvc/handler.go b/examples/todomvc/handler.go index 5ef05a54..e12d4f7c 100644 --- a/examples/todomvc/handler.go +++ b/examples/todomvc/handler.go @@ -62,13 +62,12 @@ func (l *Logger) HandleEvent(ctx context.Context, event eh.Event) error { // NewHandler sets up the full Event Horizon domain for the TodoMVC app and // returns a handler exposing some of the components. func NewHandler() (*Handler, error) { - // Local Mongo testing with Docker + // Use MongoDB in Docker with fallback to localhost. dbURL := os.Getenv("MONGO_HOST") - if dbURL == "" { - // Default to localhost dbURL = "localhost:27017" } + dbURL = "mongodb://" + dbURL // Create the event store. eventStore, err := eventstore.NewEventStore(dbURL, "todomvc") diff --git a/examples/todomvc/handler_test.go b/examples/todomvc/handler_test.go index 87abede6..9bd381dd 100644 --- a/examples/todomvc/handler_test.go +++ b/examples/todomvc/handler_test.go @@ -201,8 +201,8 @@ func TestDelete(t *testing.T) { defer cancel() l.Wait(ctx) - if _, err := h.Repo.Find(context.Background(), id); err == nil || - err.Error() != "could not find entity: not found (default)" { + _, err = h.Repo.Find(context.Background(), id) + if rrErr, ok := err.(eh.RepoError); !ok || rrErr.Err != eh.ErrEntityNotFound { t.Error("there should be a not found error:", err) } } diff --git a/go.mod b/go.mod index b82b6fb3..8ebb4cce 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,11 @@ go 1.14 require ( cloud.google.com/go/pubsub v1.3.1 - github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 github.com/google/uuid v1.1.1 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/gorilla/websocket v1.4.2 github.com/jpillora/backoff v1.0.0 github.com/kr/pretty v0.2.0 + go.mongodb.org/mongo-driver v1.3.1 google.golang.org/api v0.20.0 ) diff --git a/go.sum b/go.sum index 9876a4f8..789a0a3e 100644 --- a/go.sum +++ b/go.sum @@ -41,15 +41,41 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 h1:DujepqpGd1hyOd7aW59XpK7Qymp8iy83xq74fLr21is= -github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= +github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= +github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg= +github.com/gobuffalo/envy v1.6.15/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= +github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= +github.com/gobuffalo/flect v0.1.0/go.mod h1:d2ehjJqGOH/Kjqcoz+F7jHTBbmDb38yXA598Hb50EGs= +github.com/gobuffalo/flect v0.1.1/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI= +github.com/gobuffalo/flect v0.1.3/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI= +github.com/gobuffalo/genny v0.0.0-20190329151137-27723ad26ef9/go.mod h1:rWs4Z12d1Zbf19rlsn0nurr75KqhYp52EAGGxTbBhNk= +github.com/gobuffalo/genny v0.0.0-20190403191548-3ca520ef0d9e/go.mod h1:80lIj3kVJWwOrXWWMRzzdhW3DsrdjILVil/SFKBzF28= +github.com/gobuffalo/genny v0.1.0/go.mod h1:XidbUqzak3lHdS//TPu2OgiFB+51Ur5f7CSnXZ/JDvo= +github.com/gobuffalo/genny v0.1.1/go.mod h1:5TExbEyY48pfunL4QSXxlDOmdsD44RRq4mVZ0Ex28Xk= +github.com/gobuffalo/gitgen v0.0.0-20190315122116-cc086187d211/go.mod h1:vEHJk/E9DmhejeLeNt7UVvlSGv3ziL+djtTr3yyzcOw= +github.com/gobuffalo/gogen v0.0.0-20190315121717-8f38393713f5/go.mod h1:V9QVDIxsgKNZs6L2IYiGR8datgMhB577vzTDqypH360= +github.com/gobuffalo/gogen v0.1.0/go.mod h1:8NTelM5qd8RZ15VjQTFkAW6qOMx5wBbW4dSCS3BY8gg= +github.com/gobuffalo/gogen v0.1.1/go.mod h1:y8iBtmHmGc4qa3urIyo1shvOD8JftTtfcKi+71xfDNE= +github.com/gobuffalo/logger v0.0.0-20190315122211-86e12af44bc2/go.mod h1:QdxcLw541hSGtBnhUc4gaNIXRjiDppFGaDqzbrBd3v8= +github.com/gobuffalo/mapi v1.0.1/go.mod h1:4VAGh89y6rVOvm5A8fKFxYG+wIW6LO1FMTG9hnKStFc= +github.com/gobuffalo/mapi v1.0.2/go.mod h1:4VAGh89y6rVOvm5A8fKFxYG+wIW6LO1FMTG9hnKStFc= +github.com/gobuffalo/packd v0.0.0-20190315124812-a385830c7fc0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWeG2RIxq4= +github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWeG2RIxq4= +github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ= +github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= +github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -68,6 +94,8 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.4 h1:87PNWwrRvUSnqS4dlcBU/ftvOIBep4sYuBLlh6rX2wk= github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= @@ -96,12 +124,21 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= +github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M= +github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= @@ -109,19 +146,50 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= +github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo= +github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +go.mongodb.org/mongo-driver v1.3.1 h1:op56IfTQiaY2679w922KVWa3qcHdml2K/Io8ayAOUEQ= +go.mongodb.org/mongo-driver v1.3.1/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -185,15 +253,21 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -217,14 +291,19 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190329151228-23e29df326fe/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190416151739-9c9e1878f421/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190420181800-aa740d480789/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= @@ -307,6 +386,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/repo.go b/repo.go index d609b943..cf7cac91 100644 --- a/repo.go +++ b/repo.go @@ -95,8 +95,8 @@ type Versionable interface { // from repos that support it. You must call Close() on each Iter even when // results were delivered without apparent error. type Iter interface { - Next() bool + Next(context.Context) bool Value() interface{} // Close must be called after the last Next() to retrieve error if any - Close() error + Close(context.Context) error } diff --git a/repo/mongodb/repo.go b/repo/mongodb/repo.go index 1d4c4687..3ff5dea6 100644 --- a/repo/mongodb/repo.go +++ b/repo/mongodb/repo.go @@ -18,8 +18,16 @@ import ( "context" "errors" - "github.com/globalsign/mgo" "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/mongo/writeconcern" + + // Register uuid.UUID as BSON type. + _ "github.com/looplab/eventhorizon/types/mongodb" eh "github.com/looplab/eventhorizon" ) @@ -27,8 +35,8 @@ import ( // ErrCouldNotDialDB is when the database could not be dialed. var ErrCouldNotDialDB = errors.New("could not dial database") -// ErrNoDBSession is when no database session is set. -var ErrNoDBSession = errors.New("no database session") +// ErrNoDBClient is when no database client is set. +var ErrNoDBClient = errors.New("no database client") // ErrCouldNotClearDB is when the database could not be cleared. var ErrCouldNotClearDB = errors.New("could not clear database") @@ -41,33 +49,34 @@ var ErrInvalidQuery = errors.New("invalid query") // Repo implements an MongoDB repository for entities. type Repo struct { - session *mgo.Session + client *mongo.Client dbPrefix string collection string factoryFn func() eh.Entity } // NewRepo creates a new Repo. -func NewRepo(url, dbPrefix, collection string) (*Repo, error) { - session, err := mgo.Dial(url) +func NewRepo(uri, dbPrefix, collection string) (*Repo, error) { + opts := options.Client().ApplyURI(uri) + opts.SetWriteConcern(writeconcern.New(writeconcern.WMajority())) + opts.SetReadConcern(readconcern.Majority()) + opts.SetReadPreference(readpref.Primary()) + client, err := mongo.Connect(context.TODO(), opts) if err != nil { return nil, ErrCouldNotDialDB } - session.SetMode(mgo.Strong, true) - session.SetSafe(&mgo.Safe{W: 1}) - - return NewRepoWithSession(session, dbPrefix, collection) + return NewRepoWithClient(client, dbPrefix, collection) } -// NewRepoWithSession creates a new Repo with a session. -func NewRepoWithSession(session *mgo.Session, dbPrefix, collection string) (*Repo, error) { - if session == nil { - return nil, ErrNoDBSession +// NewRepoWithClient creates a new Repo with a client. +func NewRepoWithClient(client *mongo.Client, dbPrefix, collection string) (*Repo, error) { + if client == nil { + return nil, ErrNoDBClient } r := &Repo{ - session: session, + client: client, dbPrefix: dbPrefix, collection: collection, } @@ -82,9 +91,6 @@ func (r *Repo) Parent() eh.ReadRepo { // Find implements the Find method of the eventhorizon.ReadRepo interface. func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { - sess := r.session.Copy() - defer sess.Close() - if r.factoryFn == nil { return nil, eh.RepoError{ Err: ErrModelNotSet, @@ -92,14 +98,20 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { } } + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) + entity := r.factoryFn() - err := sess.DB(r.dbName(ctx)).C(r.collection).FindId(id).One(entity) - if err != nil { + if err := c.FindOne(ctx, bson.M{"_id": id.String()}).Decode(entity); err == mongo.ErrNoDocuments { return nil, eh.RepoError{ Err: eh.ErrEntityNotFound, BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } + } else if err != nil { + return nil, eh.RepoError{ + Err: err, + Namespace: eh.NamespaceFromContext(ctx), + } } return entity, nil @@ -107,9 +119,6 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { // FindAll implements the FindAll method of the eventhorizon.ReadRepo interface. func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { - sess := r.session.Copy() - defer sess.Close() - if r.factoryFn == nil { return nil, eh.RepoError{ Err: ErrModelNotSet, @@ -117,14 +126,29 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { } } - iter := sess.DB(r.dbName(ctx)).C(r.collection).Find(nil).Iter() + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) + + cursor, err := c.Find(ctx, bson.M{}) + if err != nil { + return nil, eh.RepoError{ + Err: err, + Namespace: eh.NamespaceFromContext(ctx), + } + } + result := []eh.Entity{} - entity := r.factoryFn() - for iter.Next(entity) { + for cursor.Next(ctx) { + entity := r.factoryFn() + if err := cursor.Decode(entity); err != nil { + return nil, eh.RepoError{ + Err: err, + Namespace: eh.NamespaceFromContext(ctx), + } + } result = append(result, entity) - entity = r.factoryFn() } - if err := iter.Close(); err != nil { + + if err := cursor.Close(ctx); err != nil { return nil, eh.RepoError{ Err: err, Namespace: eh.NamespaceFromContext(ctx), @@ -136,33 +160,36 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { // The iterator is not thread safe. type iter struct { - session *mgo.Session - iter *mgo.Iter + cursor *mongo.Cursor data eh.Entity factoryFn func() eh.Entity + decodeErr error } -func (i *iter) Next() bool { +func (i *iter) Next(ctx context.Context) bool { + if !i.cursor.Next(ctx) { + return false + } + item := i.factoryFn() - more := i.iter.Next(item) + i.decodeErr = i.cursor.Decode(item) i.data = item - return more + return true } func (i *iter) Value() interface{} { return i.data } -func (i *iter) Close() error { - err := i.iter.Close() - i.session.Close() - return err +func (i *iter) Close(ctx context.Context) error { + if err := i.cursor.Close(ctx); err != nil { + return err + } + return i.decodeErr } // FindCustomIter returns a mgo cursor you can use to stream results of very large datasets -func (r *Repo) FindCustomIter(ctx context.Context, callback func(*mgo.Collection) *mgo.Query) (eh.Iter, error) { - sess := r.session.Copy() - +func (r *Repo) FindCustomIter(ctx context.Context, f func(context.Context, *mongo.Collection) (*mongo.Cursor, error)) (eh.Iter, error) { if r.factoryFn == nil { return nil, eh.RepoError{ Err: ErrModelNotSet, @@ -170,9 +197,17 @@ func (r *Repo) FindCustomIter(ctx context.Context, callback func(*mgo.Collection } } - collection := sess.DB(r.dbName(ctx)).C(r.collection) - query := callback(collection) - if query == nil { + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) + + cursor, err := f(ctx, c) + if err != nil { + return nil, eh.RepoError{ + BaseErr: err, + Err: ErrInvalidQuery, + Namespace: eh.NamespaceFromContext(ctx), + } + } + if cursor == nil { return nil, eh.RepoError{ Err: ErrInvalidQuery, Namespace: eh.NamespaceFromContext(ctx), @@ -180,8 +215,7 @@ func (r *Repo) FindCustomIter(ctx context.Context, callback func(*mgo.Collection } return &iter{ - session: sess, - iter: query.Iter(), + cursor: cursor, factoryFn: r.factoryFn, }, nil } @@ -191,10 +225,7 @@ func (r *Repo) FindCustomIter(ctx context.Context, callback func(*mgo.Collection // the query in the callback and returning nil to block a second execution of // the same query in FindCustom. Expect a ErrInvalidQuery if returning a nil // query from the callback. -func (r *Repo) FindCustom(ctx context.Context, callback func(*mgo.Collection) *mgo.Query) ([]interface{}, error) { - sess := r.session.Copy() - defer sess.Close() - +func (r *Repo) FindCustom(ctx context.Context, f func(context.Context, *mongo.Collection) (*mongo.Cursor, error)) ([]interface{}, error) { if r.factoryFn == nil { return nil, eh.RepoError{ Err: ErrModelNotSet, @@ -202,23 +233,36 @@ func (r *Repo) FindCustom(ctx context.Context, callback func(*mgo.Collection) *m } } - collection := sess.DB(r.dbName(ctx)).C(r.collection) - query := callback(collection) - if query == nil { + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) + + cursor, err := f(ctx, c) + if err != nil { + return nil, eh.RepoError{ + BaseErr: err, + Err: ErrInvalidQuery, + Namespace: eh.NamespaceFromContext(ctx), + } + } + if cursor == nil { return nil, eh.RepoError{ Err: ErrInvalidQuery, Namespace: eh.NamespaceFromContext(ctx), } } - iter := query.Iter() result := []interface{}{} entity := r.factoryFn() - for iter.Next(entity) { + for cursor.Next(ctx) { + if err := cursor.Decode(entity); err != nil { + return nil, eh.RepoError{ + Err: err, + Namespace: eh.NamespaceFromContext(ctx), + } + } result = append(result, entity) entity = r.factoryFn() } - if err := iter.Close(); err != nil { + if err := cursor.Close(ctx); err != nil { return nil, eh.RepoError{ Err: err, Namespace: eh.NamespaceFromContext(ctx), @@ -230,9 +274,6 @@ func (r *Repo) FindCustom(ctx context.Context, callback func(*mgo.Collection) *m // Save implements the Save method of the eventhorizon.WriteRepo interface. func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { - sess := r.session.Copy() - defer sess.Close() - if entity.EntityID() == uuid.Nil { return eh.RepoError{ Err: eh.ErrCouldNotSaveEntity, @@ -241,8 +282,17 @@ func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { } } - if _, err := sess.DB(r.dbName(ctx)).C(r.collection).UpsertId( - entity.EntityID(), entity); err != nil { + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) + + if _, err := c.UpdateOne(ctx, + bson.M{ + "_id": entity.EntityID().String(), + }, + bson.M{ + "$set": entity, + }, + options.Update().SetUpsert(true), + ); err != nil { return eh.RepoError{ Err: eh.ErrCouldNotSaveEntity, BaseErr: err, @@ -254,14 +304,16 @@ func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { // Remove implements the Remove method of the eventhorizon.WriteRepo interface. func (r *Repo) Remove(ctx context.Context, id uuid.UUID) error { - sess := r.session.Copy() - defer sess.Close() + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) - err := sess.DB(r.dbName(ctx)).C(r.collection).RemoveId(id) - if err != nil { + if r, err := c.DeleteOne(ctx, bson.M{"_id": id.String()}); err != nil { + return eh.RepoError{ + Err: err, + Namespace: eh.NamespaceFromContext(ctx), + } + } else if r.DeletedCount == 0 { return eh.RepoError{ Err: eh.ErrEntityNotFound, - BaseErr: err, Namespace: eh.NamespaceFromContext(ctx), } } @@ -270,12 +322,10 @@ func (r *Repo) Remove(ctx context.Context, id uuid.UUID) error { } // Collection lets the function do custom actions on the collection. -func (r *Repo) Collection(ctx context.Context, f func(*mgo.Collection) error) error { - sess := r.session.Copy() - defer sess.Close() +func (r *Repo) Collection(ctx context.Context, f func(context.Context, *mongo.Collection) error) error { + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) - c := sess.DB(r.dbName(ctx)).C(r.collection) - if err := f(c); err != nil { + if err := f(ctx, c); err != nil { return eh.RepoError{ Err: err, Namespace: eh.NamespaceFromContext(ctx), @@ -292,7 +342,9 @@ func (r *Repo) SetEntityFactory(f func() eh.Entity) { // Clear clears the read model database. func (r *Repo) Clear(ctx context.Context) error { - if err := r.session.DB(r.dbName(ctx)).C(r.collection).DropCollection(); err != nil { + c := r.client.Database(r.dbName(ctx)).Collection(r.collection) + + if err := c.Drop(ctx); err != nil { return eh.RepoError{ Err: ErrCouldNotClearDB, BaseErr: err, @@ -303,8 +355,8 @@ func (r *Repo) Clear(ctx context.Context) error { } // Close closes a database session. -func (r *Repo) Close() { - r.session.Close() +func (r *Repo) Close(ctx context.Context) { + r.client.Disconnect(ctx) } // dbName appends the namespace, if one is set, to the DB prefix to diff --git a/repo/mongodb/repo_test.go b/repo/mongodb/repo_test.go index 5d3ef616..8924b2cd 100644 --- a/repo/mongodb/repo_test.go +++ b/repo/mongodb/repo_test.go @@ -21,9 +21,9 @@ import ( "testing" "time" - "github.com/globalsign/mgo" - "github.com/globalsign/mgo/bson" "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" eh "github.com/looplab/eventhorizon" "github.com/looplab/eventhorizon/mocks" @@ -38,6 +38,7 @@ func TestReadRepo(t *testing.T) { // Default to localhost url = "localhost:27017" } + url = "mongodb://" + url r, err := NewRepo(url, "test", "mocks.Model") if err != nil { @@ -46,7 +47,7 @@ func TestReadRepo(t *testing.T) { if r == nil { t.Error("there should be a repository") } - defer r.Close() + r.SetEntityFactory(func() eh.Entity { return &mocks.Model{} }) @@ -54,26 +55,22 @@ func TestReadRepo(t *testing.T) { t.Error("the parent repo should be nil") } - // Repo with default namespace. + customNamespaceCtx := eh.NewContextWithNamespace(context.Background(), "ns") + + defer r.Close(context.Background()) defer func() { - t.Log("clearing default db") if err = r.Clear(context.Background()); err != nil { t.Fatal("there should be no error:", err) } - }() - repo.AcceptanceTest(t, context.Background(), r) - extraRepoTests(t, context.Background(), r) - - // Repo with other namespace. - ctx := eh.NewContextWithNamespace(context.Background(), "ns") - defer func() { - t.Log("clearing ns db") - if err = r.Clear(ctx); err != nil { + if err = r.Clear(customNamespaceCtx); err != nil { t.Fatal("there should be no error:", err) } }() - repo.AcceptanceTest(t, ctx, r) - extraRepoTests(t, ctx, r) + + repo.AcceptanceTest(t, context.Background(), r) + extraRepoTests(t, context.Background(), r) + repo.AcceptanceTest(t, customNamespaceCtx, r) + extraRepoTests(t, customNamespaceCtx, r) } @@ -89,8 +86,8 @@ func extraRepoTests(t *testing.T, ctx context.Context, r *Repo) { } // FindCustom by content. - result, err := r.FindCustom(ctx, func(c *mgo.Collection) *mgo.Query { - return c.Find(bson.M{"content": "modelCustom"}) + result, err := r.FindCustom(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { + return c.Find(ctx, bson.M{"content": "modelCustom"}) }) if len(result) != 1 { t.Error("there should be one item:", len(result)) @@ -100,22 +97,22 @@ func extraRepoTests(t *testing.T, ctx context.Context, r *Repo) { } // FindCustom with no query. - result, err = r.FindCustom(ctx, func(c *mgo.Collection) *mgo.Query { - return nil + result, err = r.FindCustom(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { + return nil, nil }) if rrErr, ok := err.(eh.RepoError); !ok || rrErr.Err != ErrInvalidQuery { t.Error("there should be a invalid query error:", err) } - count := 0 + var count int64 // FindCustom with query execution in the callback. - _, err = r.FindCustom(ctx, func(c *mgo.Collection) *mgo.Query { - if count, err = c.Count(); err != nil { + _, err = r.FindCustom(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { + if count, err = c.CountDocuments(ctx, bson.M{}); err != nil { t.Error("there should be no error:", err) } // Be sure to return nil to not execute the query again in FindCustom. - return nil + return nil, nil }) if rrErr, ok := err.(eh.RepoError); !ok || rrErr.Err != ErrInvalidQuery { t.Error("there should be a invalid query error:", err) @@ -128,8 +125,9 @@ func extraRepoTests(t *testing.T, ctx context.Context, r *Repo) { ID: uuid.New(), Content: "modelCustom2", } - if err := r.Collection(ctx, func(c *mgo.Collection) error { - return c.Insert(modelCustom2) + if err := r.Collection(ctx, func(ctx context.Context, c *mongo.Collection) error { + _, err := c.InsertOne(ctx, modelCustom2) + return err }); err != nil { t.Error("there should be no error:", err) } @@ -142,23 +140,23 @@ func extraRepoTests(t *testing.T, ctx context.Context, r *Repo) { } // FindCustomIter by content. - iter, err := r.FindCustomIter(ctx, func(c *mgo.Collection) *mgo.Query { - return c.Find(bson.M{"content": "modelCustom"}) + iter, err := r.FindCustomIter(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { + return c.Find(ctx, bson.M{"content": "modelCustom"}) }) if err != nil { t.Error("there should be no error:", err) } - if iter.Next() != true { + if iter.Next(ctx) != true { t.Error("the iterator should have results") } if !reflect.DeepEqual(iter.Value(), modelCustom) { t.Error("the item should be correct:", modelCustom) } - if iter.Next() == true { + if iter.Next(ctx) == true { t.Error("the iterator should have no results") } - err = iter.Close() + err = iter.Close(ctx) if err != nil { t.Error("there should be no error:", err) } @@ -177,17 +175,17 @@ func TestRepository(t *testing.T) { // Local Mongo testing with Docker url := os.Getenv("MONGO_HOST") - if url == "" { // Default to localhost url = "localhost:27017" } + url = "mongodb://" + url repo, err := NewRepo(url, "test", "mocks.Model") if err != nil { t.Error("there should be no error:", err) } - defer repo.Close() + defer repo.Close(context.Background()) outer := &mocks.Repo{ParentRepo: repo} if r := Repository(outer); r != repo { diff --git a/types/mongodb/uuid.go b/types/mongodb/uuid.go new file mode 100644 index 00000000..cdb10404 --- /dev/null +++ b/types/mongodb/uuid.go @@ -0,0 +1,76 @@ +package mongodb + +import ( + "fmt" + "reflect" + + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/bsoncodec" + "go.mongodb.org/mongo-driver/bson/bsonrw" + "go.mongodb.org/mongo-driver/bson/bsontype" +) + +// Update the default BSON registry to be able to handle UUID types as strings. +func init() { + rb := bson.NewRegistryBuilder() + var id uuid.UUID + uuidType := reflect.TypeOf(id) + + rb.RegisterTypeEncoder(uuidType, bsoncodec.ValueEncoderFunc( + func(ec bsoncodec.EncodeContext, vw bsonrw.ValueWriter, val reflect.Value) error { + if !val.IsValid() || val.Type() != uuidType || val.Len() != 16 { + return bsoncodec.ValueEncoderError{ + Name: "uuid.UUID", + Types: []reflect.Type{uuidType}, + Received: val, + } + } + b := make([]byte, 16) + v := reflect.ValueOf(b) + reflect.Copy(v, val) + id, err := uuid.FromBytes(v.Bytes()) + if err != nil { + return fmt.Errorf("could not parse UUID bytes (%x): %w", v.Bytes(), err) + } + return vw.WriteString(id.String()) + }, + )) + + rb.RegisterTypeDecoder(uuidType, bsoncodec.ValueDecoderFunc( + func(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { + if !val.IsValid() || !val.CanSet() || val.Kind() != reflect.Array { + return bsoncodec.ValueDecoderError{ + Name: "uuid.UUID", + Kinds: []reflect.Kind{reflect.Bool}, + Received: val, + } + } + + var s string + switch vr.Type() { + case bsontype.String: + var err error + if s, err = vr.ReadString(); err != nil { + return err + } + default: + return fmt.Errorf("received invalid BSON type to decode into UUID: %s", vr.Type()) + } + + id, err := uuid.Parse(s) + if err != nil { + return fmt.Errorf("could not parse UUID string: %s", s) + } + v := reflect.ValueOf(id) + if !v.IsValid() || v.Kind() != reflect.Array { + return fmt.Errorf("invalid kind of reflected UUID value: %s", v.Kind().String()) + } + reflect.Copy(val, v) + + return nil + }, + )) + + bson.DefaultRegistry = rb.Build() +}