Skip to content

Commit

Permalink
Enhance Kafka consumer close and Http server handlers to allow runnin…
Browse files Browse the repository at this point in the history
…g tests

Signed-off-by: Laurent Broudoux <[email protected]>
  • Loading branch information
lbroudoux committed Sep 26, 2024
1 parent 267ff1e commit 34b6d5a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 27 deletions.
8 changes: 4 additions & 4 deletions cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ func Run(applicationProperties app.ApplicationProperties, applicationServices ch
applicationServices <- services // service to channel

// Define your HTTP routes
http.HandleFunc("/", handler)

http.HandleFunc("/api/orders", orderController.CreateOrder)
mux := http.NewServeMux()
mux.HandleFunc("/", handler)
mux.HandleFunc("/api/orders", orderController.CreateOrder)

// Start your HTTP server
fmt.Println("Microcks TestContainers Go Demo application is listening on localhost:9000")
fmt.Println("")

//go http.ListenAndServe(":9000", nil)
server := &http.Server{Addr: ":9000", Handler: nil}
server := &http.Server{Addr: ":9000", Handler: mux}
err = server.ListenAndServe()

<-close
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/order_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (oc *orderController) CreateOrder(w http.ResponseWriter, r *http.Request) {
// Place a new order.
order, err := oc.service.PlaceOrder(&info)
if err != nil {
// Manage unavialble product.
// Manage unavailable product.
var unavailableErr *service.UnavailablePastryError
if errors.As(err, &unavailableErr) {
w.Header().Set("Content-Type", "application/json")
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/order_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func setupEnsemble(ctx context.Context, t *testing.T, net *testcontainers.Docker
microcksEnsemble, err := ensemble.RunContainers(ctx,
ensemble.WithMainArtifact("../../testdata/order-service-openapi.yaml"),
ensemble.WithMainArtifact("../../testdata/apipastries-openapi.yaml"),
ensemble.WithSecondaryArtifact("../../testdata/order-service-postman-collection.json"),
ensemble.WithSecondaryArtifact("../../testdata/apipastries-postman-collection.json"),
ensemble.WithPostman(),
ensemble.WithNetwork(net),
Expand Down Expand Up @@ -172,7 +173,7 @@ func TestPostmanCollectionContract(t *testing.T) {
// Prepare a Microcks Test.
testRequest := client.TestRequest{
ServiceId: "Order Service API:0.1.0",
RunnerType: client.TestRunnerTypeOPENAPISCHEMA,
RunnerType: client.TestRunnerTypePOSTMAN,
TestEndpoint: fmt.Sprintf("http://host.testcontainers.internal:%d/api", server.DefaultApplicationPort),
Timeout: 2000,
}
Expand Down
56 changes: 35 additions & 21 deletions internal/service/order_event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ type OrderEventListener interface {
}

type orderEventListener struct {
kafkaConsumer *kafka.Consumer
kafkaTopic string
orderService OrderService
kafkaConsumer *kafka.Consumer
kafkaTopic string
orderService OrderService
listenerHandler chan struct{}
}

func NewOrderEventListener(kafkaConsumer *kafka.Consumer, kafkaTopic string, orderService OrderService) OrderEventListener {
Expand All @@ -53,26 +54,35 @@ func (oel *orderEventListener) Listen() error {
}

// Handle incoming messages.
handler := make(chan struct{})
stopChannel := make(chan struct{})
oel.listenerHandler = stopChannel
go func() {
defer close(handler)
// This is no longer needed since we're now closing the channel in Close()
// and handling the <-stopChannel case that ends the gorounting.
//defer close(handler)
for {
message, err := oel.kafkaConsumer.ReadMessage(100 * time.Millisecond)
if err != nil {
// Errors are informational and automatically handled by the consumer
continue
}
fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
*message.TopicPartition.Topic, string(message.Key), string(message.Value))
select {
case <-stopChannel:
fmt.Println("Stopping Kafka listener goroutine...")
return
default:
message, err := oel.kafkaConsumer.ReadMessage(100 * time.Millisecond)
if err != nil {
// Errors are informational and automatically handled by the consumer
continue
}
fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
*message.TopicPartition.Topic, string(message.Key), string(message.Value))

// Transform message value in OrderEvent.
orderEvent := model.OrderEvent{}
err = json.Unmarshal(message.Value, &orderEvent)
if err != nil {
fmt.Println("Error while deserializing message value", err)
} else {
order := oel.orderService.UpdateReviewedOrder(&orderEvent)
fmt.Println("Order '" + order.Id + "' has been updated after review")
// Transform message value in OrderEvent.
orderEvent := model.OrderEvent{}
err = json.Unmarshal(message.Value, &orderEvent)
if err != nil {
fmt.Println("Error while deserializing message value", err)
} else {
order := oel.orderService.UpdateReviewedOrder(&orderEvent)
fmt.Println("Order '" + order.Id + "' has been updated after review")
}
}
}
}()
Expand All @@ -82,5 +92,9 @@ func (oel *orderEventListener) Listen() error {

func (oel *orderEventListener) Stop() {
fmt.Println("Stopping Kafka consumer...")
oel.kafkaConsumer.Close()
close(oel.listenerHandler)
err := oel.kafkaConsumer.Close()
if err != nil {
fmt.Println("Got error while closing consumer: " + err.Error())
}
}

0 comments on commit 34b6d5a

Please sign in to comment.