Skip to content

Commit

Permalink
delegate mapping between topic and aggregateId to event dispatcher ad…
Browse files Browse the repository at this point in the history
…apter
  • Loading branch information
lorenzoranucci committed Dec 19, 2022
1 parent d7b789b commit 0584dea
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 943 deletions.
20 changes: 12 additions & 8 deletions .devenv/tor-bare-metal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ dbPort: 3306
dbUser: root
dbPassword: root
dbOutboxTableRef: my_schema.my_outbox_table
dbHeadersColumnsNames:
- uuid
topicsToPairWithAggregateTypeRegex:
- order
- invoice
aggregateTypeRegexToPairWithTopics:
- "(?i)^order$"
- "(?i)^invoice$"

kafkaBrokers: localhost:9093
kafkaTopics:
- name: "order"
numPartitions: 1
replicationFactor: 1
aggregateTypeRegexp: "(?i)^order$"
- name: "invoice"
numPartitions: 1
replicationFactor: 1
aggregateTypeRegexp: "(?i)^invoice"
kafkaHeaderMappings:
- columnName: "uuid"
headerName: "uuid"

redisHost: localhost
redisPort: 6379
Expand Down
20 changes: 12 additions & 8 deletions .devenv/tor-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ dbPort: 3306
dbUser: root
dbPassword: root
dbOutboxTableRef: my_schema.my_outbox_table
dbHeadersColumnsNames:
- uuid
topicsToPairWithAggregateTypeRegex:
- order
- invoice
aggregateTypeRegexToPairWithTopics:
- "(?i)^order$"
- "(?i)^invoice$"

kafkaBrokers: kafka:9092
kafkaTopics:
- name: "order"
numPartitions: 1
replicationFactor: 1
aggregateTypeRegexp: "(?i)^order$"
- name: "invoice"
numPartitions: 1
replicationFactor: 1
aggregateTypeRegexp: "(?i)^invoice"
kafkaHeaderMappings:
- columnName: "uuid"
headerName: "uuid"

redisHost: redis
redisPort: 6379
Expand Down
134 changes: 112 additions & 22 deletions adapters/kafka/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,126 @@
package kafka

import "github.com/Shopify/sarama"
import (
"fmt"
"regexp"

func NewEventDispatcher(producer *Producer) *EventDispatcher {
return &EventDispatcher{producer: producer}
"github.com/Shopify/sarama"
"github.com/lorenzoranucci/tor/router/pkg/run"
)

func NewEventDispatcher(
syncProducer sarama.SyncProducer,
admin sarama.ClusterAdmin,
topics []Topic,
headerMappings []HeaderMapping,
) (*EventDispatcher, error) {
err := createTopics(topics, admin)
if err != nil {
return nil, err
}

return &EventDispatcher{
syncProducer: syncProducer,
admin: admin,
topics: topics,
headerMappings: headerMappings,
}, nil
}

type EventDispatcher struct {
producer *Producer
syncProducer sarama.SyncProducer
admin sarama.ClusterAdmin
topics []Topic
headerMappings []HeaderMapping
}

type Topic struct {
Name string
TopicDetail *sarama.TopicDetail
AggregateType *regexp.Regexp
}

type HeaderMapping struct {
ColumnName string
HeaderName string
}

func (k *EventDispatcher) Dispatch(
topic string,
routingKey string,
event []byte,
headers []struct {
Key []byte
Value []byte
},
) error {
return k.producer.Dispatch(topic, routingKey, event, mapHeaders(headers))
func (k *EventDispatcher) Dispatch(event run.OutboxEvent) error {
for _, topic := range k.topics {
if !topic.AggregateType.MatchString(string(event.AggregateType)) {
continue
}

headers, err := k.mapHeaders(event.Columns)
if err != nil {
return err
}

_, _, err = k.syncProducer.SendMessage(
&sarama.ProducerMessage{
Key: sarama.ByteEncoder(event.AggregateID),
Topic: topic.Name,
Value: sarama.ByteEncoder(event.Payload),
Headers: headers,
},
)

if err != nil {
return err
}
}

return nil
}

func mapHeaders(h []struct {
Key []byte
Value []byte
}) []sarama.RecordHeader {
r := make([]sarama.RecordHeader, 0, len(h))
func createTopics(topics []Topic, admin sarama.ClusterAdmin) error {
topicNames := make([]string, 0, len(topics))
for _, topic := range topics {
topicNames = append(topicNames, topic.Name)
}

topicMetadata, err := admin.DescribeTopics(topicNames)
if err != nil {
return err
}

for _, m := range topicMetadata {
if m.Err != sarama.ErrUnknownTopicOrPartition {
continue
}

for _, topic := range topics {
if topic.Name != m.Name {
continue
}

err := admin.CreateTopic(topic.Name, topic.TopicDetail, false)
if err != nil {
return err
}
}
}
return nil
}

func (k *EventDispatcher) mapHeaders(columns []run.Column) ([]sarama.RecordHeader, error) {
r := make([]sarama.RecordHeader, 0, len(columns))

outerLoop:
for _, h := range k.headerMappings {
for _, c := range columns {
if h.ColumnName == string(c.Name) {
r = append(r, sarama.RecordHeader{
Key: []byte(h.HeaderName),
Value: c.Value,
})

continue outerLoop
}
}

for _, v := range h {
r = append(r, v)
return nil, fmt.Errorf("column not found for header. Column: %s, Header: %s", h.ColumnName, h.HeaderName)
}

return r
return r, nil
}
54 changes: 0 additions & 54 deletions adapters/kafka/producer.go

This file was deleted.

2 changes: 1 addition & 1 deletion adapters/redis/state_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r *StateHandler) GetLastPosition() (mysql.Position, error) {
return mysql.Position{
Name: p.Name,
Pos: p.Pos,
}, err
}, nil
}

func (r *StateHandler) SetLastPosition(p mysql.Position) error {
Expand Down
Loading

0 comments on commit 0584dea

Please sign in to comment.