Skip to content

Commit

Permalink
Merge pull request #5 from lorenzoranucci/add-headers-support
Browse files Browse the repository at this point in the history
Add headers feature
  • Loading branch information
lorenzoranucci authored Dec 13, 2022
2 parents 560aa2e + 21cc690 commit 64df498
Show file tree
Hide file tree
Showing 14 changed files with 302 additions and 27 deletions.
2 changes: 2 additions & 0 deletions .devenv/tor-bare-metal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ dbUser: root
dbPassword: root
dbOutboxTableRef: my_schema.my_outbox_table
aggregateTypeRegexp: "(?i)^order$"
dbHeadersColumnsNames:
- uuid

kafkaBrokers: localhost:9093
kafkaTopic: outbox_topic
Expand Down
3 changes: 3 additions & 0 deletions .devenv/tor-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ dbUser: root
dbPassword: root
dbOutboxTableRef: my_schema.my_outbox_table
aggregateTypeRegexp: "(?i)^order$"
dbHeadersColumnsNames:
- uuid


kafkaBrokers: kafka:9092
kafkaTopic: outbox_topic
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ Tor is composed of several modules so it can be extensible and make dependencies

Set up the system:
```shell
cd .devenv

make up
```

Expand Down
26 changes: 24 additions & 2 deletions adapters/kafka/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kafka

import "github.com/Shopify/sarama"

func NewEventDispatcher(producer *Producer) *EventDispatcher {
return &EventDispatcher{producer: producer}
}
Expand All @@ -8,6 +10,26 @@ type EventDispatcher struct {
producer *Producer
}

func (k *EventDispatcher) Dispatch(routingKey string, event []byte) error {
return k.producer.Dispatch(routingKey, event)
func (k *EventDispatcher) Dispatch(
routingKey string,
event []byte,
headers []struct {
Key []byte
Value []byte
},
) error {
return k.producer.Dispatch(routingKey, event, mapHeaders(headers))
}

func mapHeaders(h []struct {
Key []byte
Value []byte
}) []sarama.RecordHeader {
r := make([]sarama.RecordHeader, 0, len(h))

for _, v := range h {
r = append(r, v)
}

return r
}
13 changes: 9 additions & 4 deletions adapters/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ func NewProducer(brokers []string, topic string) (*Producer, error) {
return &Producer{syncProducer: syncProducer, topic: topic}, nil
}

func (p *Producer) Dispatch(key string, message []byte) error {
func (p *Producer) Dispatch(
key string,
message []byte,
headers []sarama.RecordHeader,
) error {
_, _, err := p.syncProducer.SendMessage(
&sarama.ProducerMessage{
Key: sarama.StringEncoder(key),
Topic: p.topic,
Value: sarama.ByteEncoder(message),
Key: sarama.StringEncoder(key),
Topic: p.topic,
Value: sarama.ByteEncoder(message),
Headers: headers,
},
)

Expand Down
1 change: 1 addition & 0 deletions example/api-server/cmd/create_outbox_table.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS my_schema.my_outbox_table
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
uuid CHAR(36) NOT NULL UNIQUE,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
payload LONGBLOB NOT NULL
Expand Down
6 changes: 4 additions & 2 deletions example/api-server/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"

_ "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/julienschmidt/httprouter"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -135,12 +136,13 @@ func (h *HTTPHandler) insertEvent(tx *sqlx.Tx, eventName string, orderUUID strin
}

query := `
INSERT INTO my_schema.my_outbox_table (aggregate_type, aggregate_id, payload)
VALUES ('order', :aggregate_id, :payload);`
INSERT INTO my_schema.my_outbox_table (uuid, aggregate_type, aggregate_id, payload)
VALUES (:uuid, 'order', :aggregate_id, :payload);`

if _, err := tx.NamedExec(
h.db.Rebind(query),
map[string]interface{}{
"uuid": uuid.New().String(),
"aggregate_id": orderUUID,
"payload": payload,
},
Expand Down
1 change: 1 addition & 0 deletions example/api-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/go-sql-driver/mysql v1.6.0
github.com/google/uuid v1.1.2
github.com/jmoiron/sqlx v1.3.5
github.com/julienschmidt/httprouter v1.3.0
github.com/sirupsen/logrus v1.9.0
Expand Down
1 change: 1 addition & 0 deletions example/api-server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down
2 changes: 2 additions & 0 deletions example/tor/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var runCmd = &cobra.Command{
viper.GetString("dbAggregateIDColumnName"),
viper.GetString("dbAggregateTypeColumnName"),
viper.GetString("dbPayloadColumnName"),
viper.GetStringSlice("dbHeadersColumnsNames"),
aggregateTypeRegexp,
)
if err != nil {
Expand All @@ -66,6 +67,7 @@ func init() {
viper.MustBindEnv("dbAggregateIDColumnName", "DB_AGGREGATE_ID_COLUMN_NAME")
viper.MustBindEnv("dbAggregateTypeColumnName", "DB_AGGREGATE_TYPE_COLUMN_NAME")
viper.MustBindEnv("dbPayloadColumnName", "DB_PAYLOAD_COLUMN_NAME")
viper.MustBindEnv("dbHeadersColumnsNames", "DB_HEADERS_COLUMNS_NAME")
viper.MustBindEnv("aggregateTypeRegexp", "AGGREGATE_TYPE_REGEXP_EXPRESSION")

viper.MustBindEnv("kafkaBrokers", "KAFKA_BROKERS")
Expand Down
41 changes: 38 additions & 3 deletions router/pkg/run/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,34 @@ type StateHandler interface {
SetLastPosition(position mysql.Position) error
}

type OutboxEvent struct {
type outboxEvent struct {
AggregateID string
Payload []byte
Headers []eventHeader
}

type eventHeader struct {
Key []byte
Value []byte
}

type EventDispatcher interface {
Dispatch(routingKey string, event []byte) error
Dispatch(
routingKey string,
event []byte,
headers []struct {
Key []byte
Value []byte
},
) error
}

func NewEventHandler(
eventDispatcher EventDispatcher,
aggregateIDColumnName string,
aggregateTypeColumnName string,
payloadColumnName string,
headersColumnsNames []string,
aggregateTypeRegexp *regexp.Regexp,
) (*EventHandler, error) {
actualAggregateIDColumnName := defaultAggregateIDColumnName
Expand Down Expand Up @@ -63,6 +77,7 @@ func NewEventHandler(
aggregateIDColumnName: actualAggregateIDColumnName,
aggregateTypeColumnName: actualAggregateTypeColumnName,
payloadColumnName: actualPayloadColumnName,
headersColumnsNames: headersColumnsNames,
aggregateTypeRegexp: actualAggregateTypeRegexp,
},
eventDispatcher: eventDispatcher,
Expand Down Expand Up @@ -90,7 +105,7 @@ func (h *EventHandler) OnRow(e *canal.RowsEvent) error {
}

for _, oe := range oes {
err = h.eventDispatcher.Dispatch(oe.AggregateID, oe.Payload)
err = h.eventDispatcher.Dispatch(oe.AggregateID, oe.Payload, mapHeaders(oe.Headers))
if err != nil {
return err
}
Expand All @@ -110,3 +125,23 @@ func (h *EventHandler) OnPosSynced(p mysql.Position, g mysql.GTIDSet, f bool) er
func (h *EventHandler) String() string {
return "EventHandler"
}

func mapHeaders(h []eventHeader) []struct {
Key []byte
Value []byte
} {
if len(h) == 0 {
return nil
}

r := make([]struct {
Key []byte
Value []byte
}, 0, len(h))

for _, v := range h {
r = append(r, v)
}

return r
}
Loading

0 comments on commit 64df498

Please sign in to comment.