diff --git a/.gitignore b/.gitignore index 83fa301d..6f8b7614 100644 --- a/.gitignore +++ b/.gitignore @@ -20,14 +20,10 @@ _cgo_export.* _testmain.go *.exe -*.test *.prof *.coverprofile coverage.out -.wercker - -vendor/ # CI .env diff --git a/Dockerfile.test b/Dockerfile.test new file mode 100644 index 00000000..a8823acd --- /dev/null +++ b/Dockerfile.test @@ -0,0 +1,12 @@ +FROM golang:1.17 + +WORKDIR /eventhorizon + +# Cache dependencies. +COPY go.mod go.sum ./ +RUN go mod download + +# Rest of the project files. +COPY . . + +ENTRYPOINT [ "/bin/sh" ] diff --git a/eventbus/kafka/eventbus.go b/eventbus/kafka/eventbus.go index 907c41e7..84c13d94 100644 --- a/eventbus/kafka/eventbus.go +++ b/eventbus/kafka/eventbus.go @@ -237,6 +237,12 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) handler := b.handler(m, h, r) for { + select { + case <-b.cctx.Done(): + break + default: + } + msg, err := r.FetchMessage(b.cctx) if errors.Is(err, context.Canceled) { break @@ -259,14 +265,16 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) default: log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) } - } else { - if err := r.CommitMessages(b.cctx, msg); err != nil { - err = fmt.Errorf("could not commit message: %w", err) - select { - case b.errCh <- eh.EventBusError{Err: err}: - default: - log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) - } + continue + } + + // Use a new context to always finish the commit. + if err := r.CommitMessages(context.Background(), msg); err != nil { + err = fmt.Errorf("could not commit message: %w", err) + select { + case b.errCh <- eh.EventBusError{Err: err}: + default: + log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) } } } diff --git a/eventstore/mongodb/eventstore.go b/eventstore/mongodb/eventstore.go index 34f834a0..8aae1617 100644 --- a/eventstore/mongodb/eventstore.go +++ b/eventstore/mongodb/eventstore.go @@ -73,6 +73,10 @@ func NewEventStoreWithClient(client *mongo.Client, db string, options ...Option) } } + if err := s.client.Ping(context.Background(), readpref.Primary()); err != nil { + return nil, fmt.Errorf("could not connect to MongoDB: %w", err) + } + return s, nil } diff --git a/eventstore/mongodb/eventstore_test.go b/eventstore/mongodb/eventstore_test.go index cd482e94..4e334699 100644 --- a/eventstore/mongodb/eventstore_test.go +++ b/eventstore/mongodb/eventstore_test.go @@ -66,7 +66,7 @@ func TestWithEventHandlerIntegration(t *testing.T) { } // Use MongoDB in Docker with fallback to localhost. - url := os.Getenv("MONGO_HOST") + url := os.Getenv("MONGODB_ADDR") if url == "" { url = "localhost:27017" } @@ -138,7 +138,7 @@ func TestWithEventHandlerIntegration(t *testing.T) { func BenchmarkEventStore(b *testing.B) { // Use MongoDB in Docker with fallback to localhost. - url := os.Getenv("MONGO_HOST") + url := os.Getenv("MONGODB_ADDR") if url == "" { url = "localhost:27017" } diff --git a/eventstore/mongodb_v2/eventstore.go b/eventstore/mongodb_v2/eventstore.go index 0555057b..72fcc276 100644 --- a/eventstore/mongodb_v2/eventstore.go +++ b/eventstore/mongodb_v2/eventstore.go @@ -76,6 +76,10 @@ func NewEventStoreWithClient(client *mongo.Client, dbName string, options ...Opt } } + if err := s.client.Ping(context.Background(), readpref.Primary()); err != nil { + return nil, fmt.Errorf("could not connect to MongoDB: %w", err) + } + ctx := context.Background() if _, err := s.events.Indexes().CreateOne(ctx, mongo.IndexModel{ diff --git a/eventstore/mongodb_v2/eventstore_test.go b/eventstore/mongodb_v2/eventstore_test.go index 97f1048b..ca16607b 100644 --- a/eventstore/mongodb_v2/eventstore_test.go +++ b/eventstore/mongodb_v2/eventstore_test.go @@ -67,7 +67,7 @@ func TestWithEventHandlerIntegration(t *testing.T) { } // Use MongoDB in Docker with fallback to localhost. - url := os.Getenv("MONGO_HOST") + url := os.Getenv("MONGODB_ADDR") if url == "" { url = "localhost:27017" } diff --git a/middleware/eventhandler/observer/middleware.go b/middleware/eventhandler/observer/middleware.go index a20ee9e1..7f3e96dd 100644 --- a/middleware/eventhandler/observer/middleware.go +++ b/middleware/eventhandler/observer/middleware.go @@ -79,11 +79,11 @@ func (h *eventHandler) HandlerType() eh.EventHandlerType { // To create handling groups manually use either the NamedGroup or UUIDGroup. func NewMiddleware(group Group) func(eh.EventHandler) eh.EventHandler { return func(h eh.EventHandler) eh.EventHandler { - return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("-%s", group.Group()))} + return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("_%s", group.Group()))} } } // Middleware creates an observer middleware with a random group. func Middleware(h eh.EventHandler) eh.EventHandler { - return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("-%s", RandomGroup().Group()))} + return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("_%s", RandomGroup().Group()))} } diff --git a/middleware/eventhandler/observer/middleware_test.go b/middleware/eventhandler/observer/middleware_test.go index 261c40d4..2388d240 100644 --- a/middleware/eventhandler/observer/middleware_test.go +++ b/middleware/eventhandler/observer/middleware_test.go @@ -39,14 +39,14 @@ func TestMiddleware(t *testing.T) { if err := h1.HandleEvent(context.Background(), event); err != nil { t.Error("there should be no error:", err) } - if h1.HandlerType() != inner.HandlerType()+"-a" { + if h1.HandlerType() != inner.HandlerType()+"_a" { t.Error("the handler type should be correct:", h1.HandlerType()) } // UUID group. groupID := uuid.New() h2 := eh.UseEventHandlerMiddleware(inner, NewMiddleware(UUIDGroup(groupID))) - if h2.HandlerType() != inner.HandlerType()+"-"+eh.EventHandlerType(groupID.String()) { + if h2.HandlerType() != inner.HandlerType()+"_"+eh.EventHandlerType(groupID.String()) { t.Error("the handler type should be correct:", h2.HandlerType()) } @@ -66,7 +66,7 @@ func TestMiddleware(t *testing.T) { t.Error("could not get hostname:", err) } h5 := eh.UseEventHandlerMiddleware(inner, NewMiddleware(HostnameGroup())) - if h5.HandlerType() != inner.HandlerType()+"-"+eh.EventHandlerType(hostname) { + if h5.HandlerType() != inner.HandlerType()+"_"+eh.EventHandlerType(hostname) { t.Error("the handler type should be correct:", h5.HandlerType()) } t.Log(h5.HandlerType()) diff --git a/repo/mongodb/repo.go b/repo/mongodb/repo.go index bc4219bd..13c65e08 100644 --- a/repo/mongodb/repo.go +++ b/repo/mongodb/repo.go @@ -85,6 +85,10 @@ func NewRepoWithClient(client *mongo.Client, db, collection string, options ...O } } + if err := r.client.Ping(context.Background(), readpref.Primary()); err != nil { + return nil, fmt.Errorf("could not connect to MongoDB: %w", err) + } + return r, nil } diff --git a/repo/mongodb/repo_test.go b/repo/mongodb/repo_test.go index 80636dd6..83bc148f 100644 --- a/repo/mongodb/repo_test.go +++ b/repo/mongodb/repo_test.go @@ -164,7 +164,11 @@ func extraRepoTests(t *testing.T, r *Repo) { } } -func TestIntoRepo(t *testing.T) { +func TestIntoRepoIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + if r := IntoRepo(context.Background(), nil); r != nil { t.Error("the repository should be nil:", r) }