Skip to content

Commit

Permalink
Merge pull request #343 from maxekman/fix/ping-mongodb
Browse files Browse the repository at this point in the history
Fix / Ping MongoDB when creating event store/repos
  • Loading branch information
maxekman authored Oct 12, 2021
2 parents 19944c0 + e02dfb3 commit a155912
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 21 deletions.
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@ _cgo_export.*
_testmain.go

*.exe
*.test
*.prof

*.coverprofile
coverage.out
.wercker

vendor/

# CI
.env
12 changes: 12 additions & 0 deletions Dockerfile.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM golang:1.17

WORKDIR /eventhorizon

# Cache dependencies.
COPY go.mod go.sum ./
RUN go mod download

# Rest of the project files.
COPY . .

ENTRYPOINT [ "/bin/sh" ]
24 changes: 16 additions & 8 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
handler := b.handler(m, h, r)

for {
select {
case <-b.cctx.Done():
break
default:
}

msg, err := r.FetchMessage(b.cctx)
if errors.Is(err, context.Canceled) {
break
Expand All @@ -259,14 +265,16 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
} else {
if err := r.CommitMessages(b.cctx, msg); err != nil {
err = fmt.Errorf("could not commit message: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
continue
}

// Use a new context to always finish the commit.
if err := r.CommitMessages(context.Background(), msg); err != nil {
err = fmt.Errorf("could not commit message: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewEventStoreWithClient(client *mongo.Client, db string, options ...Option)
}
}

if err := s.client.Ping(context.Background(), readpref.Primary()); err != nil {
return nil, fmt.Errorf("could not connect to MongoDB: %w", err)
}

return s, nil
}

Expand Down
4 changes: 2 additions & 2 deletions eventstore/mongodb/eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestWithEventHandlerIntegration(t *testing.T) {
}

// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGO_HOST")
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestWithEventHandlerIntegration(t *testing.T) {

func BenchmarkEventStore(b *testing.B) {
// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGO_HOST")
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
}
Expand Down
4 changes: 4 additions & 0 deletions eventstore/mongodb_v2/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func NewEventStoreWithClient(client *mongo.Client, dbName string, options ...Opt
}
}

if err := s.client.Ping(context.Background(), readpref.Primary()); err != nil {
return nil, fmt.Errorf("could not connect to MongoDB: %w", err)
}

ctx := context.Background()

if _, err := s.events.Indexes().CreateOne(ctx, mongo.IndexModel{
Expand Down
2 changes: 1 addition & 1 deletion eventstore/mongodb_v2/eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestWithEventHandlerIntegration(t *testing.T) {
}

// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGO_HOST")
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
}
Expand Down
4 changes: 2 additions & 2 deletions middleware/eventhandler/observer/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func (h *eventHandler) HandlerType() eh.EventHandlerType {
// To create handling groups manually use either the NamedGroup or UUIDGroup.
func NewMiddleware(group Group) func(eh.EventHandler) eh.EventHandler {
return func(h eh.EventHandler) eh.EventHandler {
return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("-%s", group.Group()))}
return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("_%s", group.Group()))}
}
}

// Middleware creates an observer middleware with a random group.
func Middleware(h eh.EventHandler) eh.EventHandler {
return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("-%s", RandomGroup().Group()))}
return &eventHandler{h, h.HandlerType() + eh.EventHandlerType(fmt.Sprintf("_%s", RandomGroup().Group()))}
}
6 changes: 3 additions & 3 deletions middleware/eventhandler/observer/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func TestMiddleware(t *testing.T) {
if err := h1.HandleEvent(context.Background(), event); err != nil {
t.Error("there should be no error:", err)
}
if h1.HandlerType() != inner.HandlerType()+"-a" {
if h1.HandlerType() != inner.HandlerType()+"_a" {
t.Error("the handler type should be correct:", h1.HandlerType())
}

// UUID group.
groupID := uuid.New()
h2 := eh.UseEventHandlerMiddleware(inner, NewMiddleware(UUIDGroup(groupID)))
if h2.HandlerType() != inner.HandlerType()+"-"+eh.EventHandlerType(groupID.String()) {
if h2.HandlerType() != inner.HandlerType()+"_"+eh.EventHandlerType(groupID.String()) {
t.Error("the handler type should be correct:", h2.HandlerType())
}

Expand All @@ -66,7 +66,7 @@ func TestMiddleware(t *testing.T) {
t.Error("could not get hostname:", err)
}
h5 := eh.UseEventHandlerMiddleware(inner, NewMiddleware(HostnameGroup()))
if h5.HandlerType() != inner.HandlerType()+"-"+eh.EventHandlerType(hostname) {
if h5.HandlerType() != inner.HandlerType()+"_"+eh.EventHandlerType(hostname) {
t.Error("the handler type should be correct:", h5.HandlerType())
}
t.Log(h5.HandlerType())
Expand Down
4 changes: 4 additions & 0 deletions repo/mongodb/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func NewRepoWithClient(client *mongo.Client, db, collection string, options ...O
}
}

if err := r.client.Ping(context.Background(), readpref.Primary()); err != nil {
return nil, fmt.Errorf("could not connect to MongoDB: %w", err)
}

return r, nil
}

Expand Down
6 changes: 5 additions & 1 deletion repo/mongodb/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ func extraRepoTests(t *testing.T, r *Repo) {
}
}

func TestIntoRepo(t *testing.T) {
func TestIntoRepoIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

if r := IntoRepo(context.Background(), nil); r != nil {
t.Error("the repository should be nil:", r)
}
Expand Down

0 comments on commit a155912

Please sign in to comment.