diff --git a/.devenv/tor-bare-metal.yaml b/.devenv/tor-bare-metal.yaml index b8cda68..c693ae8 100644 --- a/.devenv/tor-bare-metal.yaml +++ b/.devenv/tor-bare-metal.yaml @@ -3,16 +3,20 @@ dbPort: 3306 dbUser: root dbPassword: root dbOutboxTableRef: my_schema.my_outbox_table -dbHeadersColumnsNames: - - uuid -topicsToPairWithAggregateTypeRegex: - - order - - invoice -aggregateTypeRegexToPairWithTopics: - - "(?i)^order$" - - "(?i)^invoice$" kafkaBrokers: localhost:9093 +kafkaTopics: + - name: "order" + numPartitions: 1 + replicationFactor: 1 + aggregateTypeRegexp: "(?i)^order$" + - name: "invoice" + numPartitions: 1 + replicationFactor: 1 + aggregateTypeRegexp: "(?i)^invoice" +kafkaHeaderMappings: + - columnName: "uuid" + headerName: "uuid" redisHost: localhost redisPort: 6379 diff --git a/.devenv/tor-docker.yaml b/.devenv/tor-docker.yaml index a1c2368..1d931a3 100644 --- a/.devenv/tor-docker.yaml +++ b/.devenv/tor-docker.yaml @@ -3,16 +3,20 @@ dbPort: 3306 dbUser: root dbPassword: root dbOutboxTableRef: my_schema.my_outbox_table -dbHeadersColumnsNames: - - uuid -topicsToPairWithAggregateTypeRegex: - - order - - invoice -aggregateTypeRegexToPairWithTopics: - - "(?i)^order$" - - "(?i)^invoice$" kafkaBrokers: kafka:9092 +kafkaTopics: + - name: "order" + numPartitions: 1 + replicationFactor: 1 + aggregateTypeRegexp: "(?i)^order$" + - name: "invoice" + numPartitions: 1 + replicationFactor: 1 + aggregateTypeRegexp: "(?i)^invoice" +kafkaHeaderMappings: + - columnName: "uuid" + headerName: "uuid" redisHost: redis redisPort: 6379 diff --git a/adapters/kafka/event_dispatcher.go b/adapters/kafka/event_dispatcher.go index a1f0d95..58f04f0 100644 --- a/adapters/kafka/event_dispatcher.go +++ b/adapters/kafka/event_dispatcher.go @@ -1,36 +1,126 @@ package kafka -import "github.com/Shopify/sarama" +import ( + "fmt" + "regexp" -func NewEventDispatcher(producer *Producer) *EventDispatcher { - return &EventDispatcher{producer: producer} + "github.com/Shopify/sarama" + "github.com/lorenzoranucci/tor/router/pkg/run" +) + +func NewEventDispatcher( + syncProducer sarama.SyncProducer, + admin sarama.ClusterAdmin, + topics []Topic, + headerMappings []HeaderMapping, +) (*EventDispatcher, error) { + err := createTopics(topics, admin) + if err != nil { + return nil, err + } + + return &EventDispatcher{ + syncProducer: syncProducer, + admin: admin, + topics: topics, + headerMappings: headerMappings, + }, nil } type EventDispatcher struct { - producer *Producer + syncProducer sarama.SyncProducer + admin sarama.ClusterAdmin + topics []Topic + headerMappings []HeaderMapping +} + +type Topic struct { + Name string + TopicDetail *sarama.TopicDetail + AggregateType *regexp.Regexp +} + +type HeaderMapping struct { + ColumnName string + HeaderName string } -func (k *EventDispatcher) Dispatch( - topic string, - routingKey string, - event []byte, - headers []struct { - Key []byte - Value []byte - }, -) error { - return k.producer.Dispatch(topic, routingKey, event, mapHeaders(headers)) +func (k *EventDispatcher) Dispatch(event run.OutboxEvent) error { + for _, topic := range k.topics { + if !topic.AggregateType.MatchString(string(event.AggregateType)) { + continue + } + + headers, err := k.mapHeaders(event.Columns) + if err != nil { + return err + } + + _, _, err = k.syncProducer.SendMessage( + &sarama.ProducerMessage{ + Key: sarama.ByteEncoder(event.AggregateID), + Topic: topic.Name, + Value: sarama.ByteEncoder(event.Payload), + Headers: headers, + }, + ) + + if err != nil { + return err + } + } + + return nil } -func mapHeaders(h []struct { - Key []byte - Value []byte -}) []sarama.RecordHeader { - r := make([]sarama.RecordHeader, 0, len(h)) +func createTopics(topics []Topic, admin sarama.ClusterAdmin) error { + topicNames := make([]string, 0, len(topics)) + for _, topic := range topics { + topicNames = append(topicNames, topic.Name) + } + + topicMetadata, err := admin.DescribeTopics(topicNames) + if err != nil { + return err + } + + for _, m := range topicMetadata { + if m.Err != sarama.ErrUnknownTopicOrPartition { + continue + } + + for _, topic := range topics { + if topic.Name != m.Name { + continue + } + + err := admin.CreateTopic(topic.Name, topic.TopicDetail, false) + if err != nil { + return err + } + } + } + return nil +} + +func (k *EventDispatcher) mapHeaders(columns []run.Column) ([]sarama.RecordHeader, error) { + r := make([]sarama.RecordHeader, 0, len(columns)) + +outerLoop: + for _, h := range k.headerMappings { + for _, c := range columns { + if h.ColumnName == string(c.Name) { + r = append(r, sarama.RecordHeader{ + Key: []byte(h.HeaderName), + Value: c.Value, + }) + + continue outerLoop + } + } - for _, v := range h { - r = append(r, v) + return nil, fmt.Errorf("column not found for header. Column: %s, Header: %s", h.ColumnName, h.HeaderName) } - return r + return r, nil } diff --git a/adapters/kafka/producer.go b/adapters/kafka/producer.go deleted file mode 100644 index 445fb2f..0000000 --- a/adapters/kafka/producer.go +++ /dev/null @@ -1,54 +0,0 @@ -package kafka - -import ( - "github.com/Shopify/sarama" -) - -type Producer struct { - syncProducer sarama.SyncProducer -} - -func NewProducer(brokers []string) (*Producer, error) { - syncProducer, err := newSyncProducer(brokers) - if err != nil { - return nil, err - } - - return &Producer{syncProducer: syncProducer}, nil -} - -func (p *Producer) Dispatch( - topic string, - key string, - message []byte, - headers []sarama.RecordHeader, -) error { - _, _, err := p.syncProducer.SendMessage( - &sarama.ProducerMessage{ - Key: sarama.StringEncoder(key), - Topic: topic, - Value: sarama.ByteEncoder(message), - Headers: headers, - }, - ) - - return err -} - -func newSyncProducer(brokerList []string) (sarama.SyncProducer, error) { - config := sarama.NewConfig() - config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message - config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message - config.Producer.Return.Successes = true - - producer, err := sarama.NewSyncProducer(brokerList, config) - if err != nil { - return nil, err - } - - return producer, err -} - -func (p *Producer) Close() error { - return p.syncProducer.Close() -} diff --git a/adapters/redis/state_handler.go b/adapters/redis/state_handler.go index 9fee250..8ca4de9 100644 --- a/adapters/redis/state_handler.go +++ b/adapters/redis/state_handler.go @@ -36,7 +36,7 @@ func (r *StateHandler) GetLastPosition() (mysql.Position, error) { return mysql.Position{ Name: p.Name, Pos: p.Pos, - }, err + }, nil } func (r *StateHandler) SetLastPosition(p mysql.Position) error { diff --git a/example/tor/cmd/run.go b/example/tor/cmd/run.go index 7a8b410..5d6b4dd 100644 --- a/example/tor/cmd/run.go +++ b/example/tor/cmd/run.go @@ -1,12 +1,12 @@ package cmd import ( - "errors" "fmt" "os" "regexp" "time" + "github.com/Shopify/sarama" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-redis/redis/v8" "github.com/lorenzoranucci/tor/adapters/kafka" @@ -17,6 +17,18 @@ import ( "github.com/spf13/viper" ) +type KafkaTopic struct { + Name string + NumPartitions int32 + ReplicationFactor int16 + AggregateTypeRegexp string +} + +type KafkaHeaderMappings struct { + ColumnName string + HeaderName string +} + // runCmd represents the run command var runCmd = &cobra.Command{ Use: "run", @@ -32,20 +44,12 @@ var runCmd = &cobra.Command{ return err } - aggregateTypeTopicPairs, err := getAggregateTypeTopicPairs() - if err != nil { - return err - } - stateHandler := getRedisStateHandler() handler, err := run.NewEventHandler( ed, viper.GetString("dbAggregateIDColumnName"), viper.GetString("dbAggregateTypeColumnName"), viper.GetString("dbPayloadColumnName"), - viper.GetStringSlice("dbHeadersColumnsNames"), - aggregateTypeTopicPairs, - viper.GetBool("includeTransactionTimestamp"), ) if err != nil { return err @@ -57,28 +61,6 @@ var runCmd = &cobra.Command{ }, } -func getAggregateTypeTopicPairs() ([]run.AggregateTypeTopicPair, error) { - topicsToPairWithAggregateTypeRegex := viper.GetStringSlice("topicsToPairWithAggregateTypeRegex") - aggregateTypeRegexToPairWithTopics := viper.GetStringSlice("aggregateTypeRegexToPairWithTopics") - if len(topicsToPairWithAggregateTypeRegex) != len(aggregateTypeRegexToPairWithTopics) { - return nil, errors.New("topicsToPairWithAggregateTypeRegex and aggregateTypeRegexToPairWithTopics must have same length") - } - aggregateTypeTopicPairs := make([]run.AggregateTypeTopicPair, 0, len(topicsToPairWithAggregateTypeRegex)) - for i := 0; i < len(topicsToPairWithAggregateTypeRegex); i++ { - aggregateTypeRegexp, err := regexp.Compile(aggregateTypeRegexToPairWithTopics[i]) - if err != nil { - return nil, err - } - - aggregateTypeTopicPairs = append(aggregateTypeTopicPairs, run.AggregateTypeTopicPair{ - AggregateTypeRegexp: aggregateTypeRegexp, - Topic: topicsToPairWithAggregateTypeRegex[i], - }) - } - - return aggregateTypeTopicPairs, nil -} - func init() { viper.MustBindEnv("dbHost", "DB_HOST") viper.MustBindEnv("dbPort", "DB_PORT") @@ -110,14 +92,55 @@ func init() { } func getKafkaEventDispatcher() (*kafka.EventDispatcher, error) { - producer, err := kafka.NewProducer( - viper.GetStringSlice("kafkaBrokers"), - ) + producer, err := getKafkaSyncProducer() + if err != nil { + return nil, err + } + + admin, err := sarama.NewClusterAdmin(viper.GetStringSlice("kafkaBrokers"), sarama.NewConfig()) + if err != nil { + return nil, err + } + + var kafkaTopics []KafkaTopic + err = viper.UnmarshalKey("kafkaTopics", &kafkaTopics) + if err != nil { + return nil, err + } + topics := make([]kafka.Topic, 0, len(kafkaTopics)) + for _, topic := range kafkaTopics { + topics = append(topics, kafka.Topic{ + Name: topic.Name, + TopicDetail: &sarama.TopicDetail{ + NumPartitions: topic.NumPartitions, + ReplicationFactor: topic.ReplicationFactor, + }, + AggregateType: regexp.MustCompile(topic.AggregateTypeRegexp), + }) + } + + var kafkaHeaderMappings []kafka.HeaderMapping + err = viper.UnmarshalKey("kafkaHeaderMappings", &kafkaHeaderMappings) + if err != nil { + return nil, err + } + + return kafka.NewEventDispatcher(producer, admin, topics, kafkaHeaderMappings) +} + +func getKafkaSyncProducer() (sarama.SyncProducer, error) { + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + config.Producer.Return.Successes = true + config.Metadata.AllowAutoTopicCreation = false + + producer, err := sarama.NewSyncProducer(viper.GetStringSlice("kafkaBrokers"), config) if err != nil { return nil, err } - return kafka.NewEventDispatcher(producer), nil + return producer, err } func getRedisStateHandler() *redis2.StateHandler { diff --git a/router/pkg/run/event_handler.go b/router/pkg/run/event_handler.go index 701a19a..ca92093 100644 --- a/router/pkg/run/event_handler.go +++ b/router/pkg/run/event_handler.go @@ -20,28 +20,21 @@ type StateHandler interface { SetLastPosition(position mysql.Position) error } -type outboxEvent struct { - AggregateID string - Payload []byte - Headers []eventHeader - Topic string +type OutboxEvent struct { + AggregateID []byte + AggregateType []byte + Payload []byte + Columns []Column + EventTimestampFromDatabase uint32 } -type eventHeader struct { - Key []byte +type Column struct { + Name []byte Value []byte } type EventDispatcher interface { - Dispatch( - topic string, - routingKey string, - event []byte, - headers []struct { - Key []byte - Value []byte - }, - ) error + Dispatch(event OutboxEvent) error } type AggregateTypeTopicPair struct { @@ -54,9 +47,6 @@ func NewEventHandler( aggregateIDColumnName string, aggregateTypeColumnName string, payloadColumnName string, - headersColumnsNames []string, - aggregateTypeTopicPairs []AggregateTypeTopicPair, - includeTransactionTimestamp bool, ) (*EventHandler, error) { actualAggregateIDColumnName := defaultAggregateIDColumnName if aggregateIDColumnName != "" { @@ -75,12 +65,9 @@ func NewEventHandler( return &EventHandler{ eventMapper: &EventMapper{ - aggregateIDColumnName: actualAggregateIDColumnName, - aggregateTypeColumnName: actualAggregateTypeColumnName, - payloadColumnName: actualPayloadColumnName, - headersColumnsNames: headersColumnsNames, - aggregateTypeTopicPairs: aggregateTypeTopicPairs, - includeTransactionTimestamp: includeTransactionTimestamp, + aggregateIDColumnName: actualAggregateIDColumnName, + aggregateTypeColumnName: actualAggregateTypeColumnName, + payloadColumnName: actualPayloadColumnName, }, eventDispatcher: eventDispatcher, }, nil @@ -107,12 +94,11 @@ func (h *EventHandler) OnRow(e *canal.RowsEvent) error { } for _, oe := range oes { - err = h.eventDispatcher.Dispatch(oe.Topic, oe.AggregateID, oe.Payload, mapHeaders(oe.Headers)) + err = h.eventDispatcher.Dispatch(oe) if err != nil { return err } - logrus.WithField("aggregateId", oe.AggregateID). - WithField("payload", oe.Payload). + logrus.WithField("event", oe). Debug("event dispatched") } @@ -127,23 +113,3 @@ func (h *EventHandler) OnPosSynced(p mysql.Position, g mysql.GTIDSet, f bool) er func (h *EventHandler) String() string { return "EventHandler" } - -func mapHeaders(h []eventHeader) []struct { - Key []byte - Value []byte -} { - if len(h) == 0 { - return nil - } - - r := make([]struct { - Key []byte - Value []byte - }, 0, len(h)) - - for _, v := range h { - r = append(r, v) - } - - return r -} diff --git a/router/pkg/run/event_handler_test.go b/router/pkg/run/event_handler_test.go index 87e8290..6ab16ae 100644 --- a/router/pkg/run/event_handler_test.go +++ b/router/pkg/run/event_handler_test.go @@ -2,39 +2,41 @@ package run_test import ( "errors" - "reflect" - "regexp" "testing" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" "github.com/lorenzoranucci/tor/router/pkg/run" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestEventHandler_OnRow_HappyPaths(t *testing.T) { type fields struct { - eventDispatcher *eventDispatcherMock - aggregateIdColumnName string - aggregateTypeColumnName string - payloadColumnName string - headersColumnsNames []string - aggregateTypeTopicPairs []run.AggregateTypeTopicPair - includeTransactionTimestamp bool + eventDispatcher *eventDispatcherMock + aggregateIdColumnName string + aggregateTypeColumnName string + payloadColumnName string } type args struct { e *canal.RowsEvent } + + orderAggregateID := []byte("c44ade3e-9394-4e6e-8d2d-20707d61061c") + orderAggregateType := []byte("order") + orderPayload := []byte(`{"name": "new order"}`) + orderOtherColumnValue := 11 + var timestamp uint32 = 5 + tests := []struct { name string fields fields args args - wantDispatches []dispatch - wantErr bool + wantDispatches []run.OutboxEvent }{ { - name: "no topic-aggregate type pair then no dispatch", + name: "when row-event action is not insert then row-event is skipped", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -52,7 +54,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { }, }, }, - Action: canal.InsertAction, + Action: canal.DeleteAction, Rows: [][]interface{}{ { "c44ade3e-9394-4e6e-8d2d-20707d61061c", @@ -68,7 +70,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { }, }, { - name: "single row-event, with string payload, and default column ids", + name: "single row-events, with default column names", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -84,6 +86,9 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { { Name: "payload", }, + { + Name: "other_column", + }, }, }, Action: canal.InsertAction, @@ -91,87 +96,53 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { { "c44ade3e-9394-4e6e-8d2d-20707d61061c", "order", - `{"name": "new order"}`, + orderPayload, + orderOtherColumnValue, }, }, - Header: &replication.EventHeader{}, + Header: &replication.EventHeader{ + Timestamp: timestamp, + }, }, }, fields: fields{ eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantDispatches: []dispatch{ + wantDispatches: []run.OutboxEvent{ { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), - }, - }, - }, - { - name: "single row-event, with byte payload, and default column ids", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, + AggregateID: orderAggregateID, + AggregateType: orderAggregateType, + Payload: orderPayload, + Columns: []run.Column{ + { + Name: []byte("aggregate_id"), + Value: orderAggregateID, }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - "order", - []byte(`{"name": "new order"}`), + Name: []byte("aggregate_type"), + Value: orderAggregateType, + }, + { + Name: []byte("payload"), + Value: orderPayload, + }, + { + Name: []byte("other_column"), + Value: []byte(`11`), }, }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantDispatches: []dispatch{ - { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), + EventTimestampFromDatabase: timestamp, }, }, }, { - name: "single row-event, custom column ids", + name: "single row-event with custom column names and order", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ Schema: "my_schema", Name: "outbox", Columns: []schema.TableColumn{ - { - Name: "id", - }, { Name: "aggregateType", }, @@ -181,18 +152,23 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { { Name: "aggregateId", }, + { + Name: "other_column", + }, }, }, Action: canal.InsertAction, Rows: [][]interface{}{ { - 1, "order", - `{"name": "new order"}`, + orderPayload, "c44ade3e-9394-4e6e-8d2d-20707d61061c", + orderOtherColumnValue, }, }, - Header: &replication.EventHeader{}, + Header: &replication.EventHeader{ + Timestamp: timestamp, + }, }, }, fields: fields{ @@ -200,80 +176,36 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { aggregateIdColumnName: "aggregateId", aggregateTypeColumnName: "aggregateType", payloadColumnName: "payload_", - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantDispatches: []dispatch{ + wantDispatches: []run.OutboxEvent{ { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), - }, - }, - }, - { - name: "multiple row-events, with default column ids", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, + AggregateID: orderAggregateID, + AggregateType: orderAggregateType, + Payload: orderPayload, + Columns: []run.Column{ + { + Name: []byte("aggregateType"), + Value: orderAggregateType, }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - "order", - []byte(`{"name": "new order"}`), + Name: []byte("payload_"), + Value: orderPayload, }, { - "c38a5d13-788c-4878-8bdc-c012cbad5b82", - "order", - `{"name": "new order"}`, + Name: []byte("aggregateId"), + Value: orderAggregateID, + }, + { + Name: []byte("other_column"), + Value: []byte(`11`), }, }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantDispatches: []dispatch{ - { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), - }, - { - topic: "order", - routingKey: "c38a5d13-788c-4878-8bdc-c012cbad5b82", - event: []byte(`{"name": "new order"}`), + EventTimestampFromDatabase: timestamp, }, }, }, { - name: "multiple row-events, with one entry not matching aggregate-type regexp", + name: "multiple row-events, with default column names", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -289,6 +221,9 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { { Name: "payload", }, + { + Name: "other_column", + }, }, }, Action: canal.InsertAction, @@ -296,165 +231,77 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { { "c44ade3e-9394-4e6e-8d2d-20707d61061c", "order", - []byte(`{"name": "new order"}`), + orderPayload, + orderOtherColumnValue, }, { "c38a5d13-788c-4878-8bdc-c012cbad5b82", "invoice", `{"name": "new invoice"}`, + nil, }, }, - Header: &replication.EventHeader{}, + Header: &replication.EventHeader{ + Timestamp: timestamp, + }, }, }, fields: fields{ eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantDispatches: []dispatch{ + wantDispatches: []run.OutboxEvent{ { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), - }, - }, - }, - { - name: "multiple row-events, with different matching aggregate-types regexp", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, + AggregateID: orderAggregateID, + AggregateType: orderAggregateType, + Payload: orderPayload, + Columns: []run.Column{ + { + Name: []byte("aggregate_id"), + Value: orderAggregateID, }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - "order", - []byte(`{"name": "new order"}`), + Name: []byte("aggregate_type"), + Value: orderAggregateType, }, { - "c38a5d13-788c-4878-8bdc-c012cbad5b82", - "invoice", - `{"name": "new invoice"}`, + Name: []byte("payload"), + Value: orderPayload, + }, + { + Name: []byte("other_column"), + Value: []byte(`11`), }, }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^invoice$"), - Topic: "invoice", - }, - }, - }, - wantDispatches: []dispatch{ - { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), + EventTimestampFromDatabase: timestamp, }, { - topic: "invoice", - routingKey: "c38a5d13-788c-4878-8bdc-c012cbad5b82", - event: []byte(`{"name": "new invoice"}`), - }, - }, - }, - { - name: "single row-event, with headers", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "counter", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, - { - Name: "uuid", - }, + AggregateID: []byte("c38a5d13-788c-4878-8bdc-c012cbad5b82"), + AggregateType: []byte("invoice"), + Payload: []byte(`{"name": "new invoice"}`), + Columns: []run.Column{ + { + Name: []byte("aggregate_id"), + Value: []byte("c38a5d13-788c-4878-8bdc-c012cbad5b82"), }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - 1, - "order", - `{"name": "new order"}`, - "b948f9a6-5797-4585-b386-dd8a1a4e30db", + Name: []byte("aggregate_type"), + Value: []byte("invoice"), }, - }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - headersColumnsNames: []string{"uuid", "counter"}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantDispatches: []dispatch{ - { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), headers: []struct { - Key []byte - Value []byte - }{ { - Key: []byte("uuid"), - Value: []byte("b948f9a6-5797-4585-b386-dd8a1a4e30db"), + Name: []byte("payload"), + Value: []byte(`{"name": "new invoice"}`), }, { - Key: []byte("counter"), - Value: []byte("1"), + Name: []byte("other_column"), + Value: nil, }, }, + EventTimestampFromDatabase: timestamp, }, }, }, { - name: "single row-event, with transaction timestamp", + name: "multiple row-events, with custom column names", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -462,19 +309,16 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { Name: "outbox", Columns: []schema.TableColumn{ { - Name: "aggregate_id", - }, - { - Name: "counter", + Name: "aggregateId", }, { - Name: "aggregate_type", + Name: "aggregateType", }, { - Name: "payload", + Name: "payload_", }, { - Name: "uuid", + Name: "otherColumn", }, }, }, @@ -482,40 +326,76 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { Rows: [][]interface{}{ { "c44ade3e-9394-4e6e-8d2d-20707d61061c", - 1, "order", - `{"name": "new order"}`, - "b948f9a6-5797-4585-b386-dd8a1a4e30db", + orderPayload, + orderOtherColumnValue, + }, + { + "c38a5d13-788c-4878-8bdc-c012cbad5b82", + "invoice", + `{"name": "new invoice"}`, + nil, }, }, Header: &replication.EventHeader{ - Timestamp: 100, + Timestamp: timestamp, }, }, }, fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - includeTransactionTimestamp: true, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", + eventDispatcher: &eventDispatcherMock{}, + aggregateIdColumnName: "aggregateId", + aggregateTypeColumnName: "aggregateType", + payloadColumnName: "payload_", + }, + wantDispatches: []run.OutboxEvent{ + { + AggregateID: orderAggregateID, + AggregateType: orderAggregateType, + Payload: orderPayload, + Columns: []run.Column{ + { + Name: []byte("aggregateId"), + Value: orderAggregateID, + }, + { + Name: []byte("aggregateType"), + Value: orderAggregateType, + }, + { + Name: []byte("payload_"), + Value: orderPayload, + }, + { + Name: []byte("otherColumn"), + Value: []byte(`11`), + }, }, + EventTimestampFromDatabase: timestamp, }, - }, - wantDispatches: []dispatch{ { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), headers: []struct { - Key []byte - Value []byte - }{ + AggregateID: []byte("c38a5d13-788c-4878-8bdc-c012cbad5b82"), + AggregateType: []byte("invoice"), + Payload: []byte(`{"name": "new invoice"}`), + Columns: []run.Column{ + { + Name: []byte("aggregateId"), + Value: []byte("c38a5d13-788c-4878-8bdc-c012cbad5b82"), + }, + { + Name: []byte("aggregateType"), + Value: []byte("invoice"), + }, + { + Name: []byte("payload_"), + Value: []byte(`{"name": "new invoice"}`), + }, { - Key: []byte("transactionTimestamp"), - Value: []byte("100"), + Name: []byte("otherColumn"), + Value: nil, }, }, + EventTimestampFromDatabase: timestamp, }, }, }, @@ -528,19 +408,14 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) { tt.fields.aggregateIdColumnName, tt.fields.aggregateTypeColumnName, tt.fields.payloadColumnName, - tt.fields.headersColumnsNames, - tt.fields.aggregateTypeTopicPairs, - tt.fields.includeTransactionTimestamp, ) require.NoError(t, err) - if err := h.OnRow(tt.args.e); (err != nil) != tt.wantErr { - t.Errorf("OnRow() error = %v, wantErr %v", err, tt.wantErr) + if err := h.OnRow(tt.args.e); (err != nil) != false { + t.Errorf("OnRow() error = %v, wantErr false", err) } - if !reflect.DeepEqual(tt.wantDispatches, tt.fields.eventDispatcher.dispatches) { - t.Errorf("OnRow() dispatches = %v, wantDispatches %v", tt.fields.eventDispatcher.dispatches, tt.wantDispatches) - } + assert.Equal(t, tt.wantDispatches, tt.fields.eventDispatcher.dispatches) }) } } @@ -551,8 +426,6 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { aggregateIdColumnName string aggregateTypeColumnName string payloadColumnName string - headersColumnsNames []string - aggregateTypeTopicPairs []run.AggregateTypeTopicPair } type args struct { e *canal.RowsEvent @@ -561,50 +434,8 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { name string fields fields args args - wantDispatches []dispatch - wantErr bool wantErrOnConstruct bool }{ - { - name: "when row-event action is not insert then row-event is skipped", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, - }, - }, - Action: canal.DeleteAction, - Rows: [][]interface{}{ - { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - "order", - `{"name": "new order"}`, - }, - }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - }, { name: "when dispatcher fails then error", args: args{ @@ -639,24 +470,10 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { eventDispatcher: &eventDispatcherMock{ err: errors.New(""), }, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantDispatches: []dispatch{ - { - topic: "order", - routingKey: "c44ade3e-9394-4e6e-8d2d-20707d61061c", - event: []byte(`{"name": "new order"}`), - }, - }, - wantErr: true, }, { - name: "when a column value is missing then error", + name: "when a Column value is missing compared to table structure then error", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -686,62 +503,10 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { }, fields: fields{ eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantErr: true, }, { - name: "when a header's column value is missing then error", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, - { - Name: "uuid", - }, - }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ - { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - "order", - "", - }, - }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - headersColumnsNames: []string{"uuid"}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantErr: true, - }, - { - name: "when aggregate-id column is missing then error", + name: "when aggregate-id Column is missing then error", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -772,17 +537,10 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { }, fields: fields{ eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantErr: true, }, { - name: "when aggregate-type column is missing then error", + name: "when aggregate-type Column is missing then error", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -813,17 +571,10 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { }, fields: fields{ eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantErr: true, }, { - name: "when payload column is missing then error", + name: "when payload Column is missing then error", args: args{ e: &canal.RowsEvent{ Table: &schema.Table{ @@ -854,179 +605,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { }, fields: fields{ eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantErr: true, - }, - { - name: "when header column is missing then error", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, - }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ - { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - "order", - `{"name": "new order"}`, - }, - }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - headersColumnsNames: []string{"uuid"}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantErr: true, - }, - { - name: "when aggregate-id value is not string then error", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, - }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ - { - 1, - "order", - `{"name": "new order"}`, - }, - }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, }, - wantErr: true, - }, - { - name: "when aggregate-type value is not string then error", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, - }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ - { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - 1, - `{"name": "new order"}`, - }, - }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantErr: true, - }, - { - name: "when payload value is not string then error", - args: args{ - e: &canal.RowsEvent{ - Table: &schema.Table{ - Schema: "my_schema", - Name: "outbox", - Columns: []schema.TableColumn{ - { - Name: "aggregate_id", - }, - { - Name: "aggregate_type", - }, - { - Name: "payload", - }, - }, - }, - Action: canal.InsertAction, - Rows: [][]interface{}{ - { - "c44ade3e-9394-4e6e-8d2d-20707d61061c", - "order", - 1, - }, - }, - Header: &replication.EventHeader{}, - }, - }, - fields: fields{ - eventDispatcher: &eventDispatcherMock{}, - aggregateTypeTopicPairs: []run.AggregateTypeTopicPair{ - { - AggregateTypeRegexp: regexp.MustCompile("(?i)^order$"), - Topic: "order", - }, - }, - }, - wantErr: true, }, } for _, tt := range tests { @@ -1037,9 +616,6 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { tt.fields.aggregateIdColumnName, tt.fields.aggregateTypeColumnName, tt.fields.payloadColumnName, - tt.fields.headersColumnsNames, - tt.fields.aggregateTypeTopicPairs, - false, ) if (err != nil) != tt.wantErrOnConstruct { t.Errorf("NewEventHandler() error = %v, wantErr %v", err, tt.wantErrOnConstruct) @@ -1049,42 +625,20 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) { return } - if err := h.OnRow(tt.args.e); (err != nil) != tt.wantErr { - t.Errorf("OnRow() error = %v, wantErr %v", err, tt.wantErr) - } - - if !reflect.DeepEqual(tt.wantDispatches, tt.fields.eventDispatcher.dispatches) { - t.Errorf("OnRow() dispatches = %v, wantDispatches %v", tt.fields.eventDispatcher.dispatches, tt.wantDispatches) + if err := h.OnRow(tt.args.e); (err != nil) != true { + t.Errorf("OnRow() error = %v, wantErr true", err) } }) } } -type dispatch struct { - topic string - routingKey string - event []byte - headers []struct { - Key []byte - Value []byte - } -} - type eventDispatcherMock struct { - dispatches []dispatch + dispatches []run.OutboxEvent err error } -func (e *eventDispatcherMock) Dispatch(topic string, routingKey string, event []byte, headers []struct { - Key []byte - Value []byte -}) error { - e.dispatches = append(e.dispatches, dispatch{ - topic: topic, - routingKey: routingKey, - event: event, - headers: headers, - }) +func (e *eventDispatcherMock) Dispatch(oe run.OutboxEvent) error { + e.dispatches = append(e.dispatches, oe) return e.err } diff --git a/router/pkg/run/event_mapper.go b/router/pkg/run/event_mapper.go index 0cf908a..a953a8a 100644 --- a/router/pkg/run/event_mapper.go +++ b/router/pkg/run/event_mapper.go @@ -5,191 +5,119 @@ import ( "fmt" "github.com/go-mysql-org/go-mysql/canal" - "github.com/sirupsen/logrus" + "github.com/go-mysql-org/go-mysql/schema" ) type EventMapper struct { - aggregateIDColumnName string - aggregateTypeColumnName string - payloadColumnName string - headersColumnsNames []string - aggregateTypeTopicPairs []AggregateTypeTopicPair - includeTransactionTimestamp bool + aggregateIDColumnName string + aggregateTypeColumnName string + payloadColumnName string } var notInsertError = errors.New("row-event is not an insert") -func (e *EventMapper) Map(event *canal.RowsEvent) ([]outboxEvent, error) { +func (e *EventMapper) Map(event *canal.RowsEvent) ([]OutboxEvent, error) { if event.Action != canal.InsertAction { return nil, notInsertError } - aggregateIDIndex, aggregateTypeIndex, payloadIndex, err := e.getMainColumnsIndices(event) + err := assertRowsSizesAreValid(event) if err != nil { return nil, err } - headerColumnsIndices, err := e.getHeadersColumnsIndices(event, e.headersColumnsNames) - if err != nil { - return nil, err - } - - oes := make([]outboxEvent, 0, len(event.Rows)) + oes := make([]OutboxEvent, 0, len(event.Rows)) for _, row := range event.Rows { - err := assertRowSizeIsValid(len(row), []int{aggregateTypeIndex, aggregateIDIndex, payloadIndex}, headerColumnsIndices) - if err != nil { - return nil, err - } + c := getColumns(event.Table.Columns, row) - aggregateID, aggregateType, payload, err := getMainColumnsValue(row, aggregateIDIndex, aggregateTypeIndex, payloadIndex) + aggregateID, aggregateType, payload, err := e.getMainColumnsValue(c) if err != nil { return nil, err } - h := getHeaderColumnsValues(row, headerColumnsIndices) - - if e.includeTransactionTimestamp { - h = append(h, eventHeader{ - Key: []byte("transactionTimestamp"), - Value: []byte(fmt.Sprintf("%d", event.Header.Timestamp)), - }) - } - - for _, pair := range e.aggregateTypeTopicPairs { - if !pair.AggregateTypeRegexp.MatchString(aggregateType) { - logrus.WithField("aggregateType", aggregateType). - WithField("aggregateTypeTopicPairs", pair.AggregateTypeRegexp.String()). - Debug("skipping outbox event that does not match aggregate type regexp") - continue - } - - oes = append(oes, outboxEvent{ - Topic: pair.Topic, - AggregateID: aggregateID, - Payload: payload, - Headers: h, - }) - } + oes = append(oes, OutboxEvent{ + AggregateID: aggregateID, + AggregateType: aggregateType, + Payload: payload, + Columns: c, + EventTimestampFromDatabase: event.Header.Timestamp, + }) } return oes, nil } -func assertRowSizeIsValid(rowLen int, columnIndices []int, headerColumnIndices []headerIndex) error { - for _, index := range columnIndices { - if index >= rowLen { - return fmt.Errorf("unexpected event row size") - } - } - - for _, index := range headerColumnIndices { - if index.index >= rowLen { - return fmt.Errorf("unexpected event row size") +func assertRowsSizesAreValid(event *canal.RowsEvent) error { + for _, row := range event.Rows { + if len(event.Table.Columns) != len(row) { + return errors.New("unexpected row length") } } return nil } -func getMainColumnsValue( - row []interface{}, - aggregateIDIndex int, - aggregateTypeIndex int, - payloadIndex int, -) (string, string, []byte, error) { - aggregateID, ok := row[aggregateIDIndex].(string) - if !ok { - return "", "", nil, fmt.Errorf("aggregate id is not string") - } +func getColumns( + tableColumns []schema.TableColumn, + rowColumns []interface{}, +) []Column { + r := make([]Column, 0, len(tableColumns)) + for i, etc := range tableColumns { + if rowColumns[i] == nil { + r = append(r, Column{ + Name: []byte(etc.Name), + Value: nil, + }) - aggregateType, ok := row[aggregateTypeIndex].(string) - if !ok { - return "", "", nil, fmt.Errorf("aggregate type is not string") - } + continue + } - payload, ok := row[payloadIndex].([]byte) - if !ok { - payloadS, ok := row[payloadIndex].(string) + cv, ok := rowColumns[i].([]byte) if !ok { - return "", "", nil, fmt.Errorf("payload is not []byte or string") + cv = []byte(fmt.Sprintf("%v", rowColumns[i])) } - payload = []byte(payloadS) + + r = append(r, Column{ + Name: []byte(etc.Name), + Value: cv, + }) } - return aggregateID, aggregateType, payload, nil -} -func (e *EventMapper) getMainColumnsIndices(event *canal.RowsEvent) (int, int, int, error) { - aggregateIDIndex := -1 - aggregateTypeIndex := -1 - payloadIndex := -1 - for i, column := range event.Table.Columns { - if column.Name == e.aggregateIDColumnName { - aggregateIDIndex = i - continue - } + return r +} - if column.Name == e.payloadColumnName { - payloadIndex = i +func (e *EventMapper) getMainColumnsValue( + columns []Column, +) ([]byte, []byte, []byte, error) { + var aggregateID, aggregateType, payload []byte + for _, column := range columns { + if column.Name == nil { continue } - if column.Name == e.aggregateTypeColumnName { - aggregateTypeIndex = i + switch string(column.Name) { + case e.aggregateIDColumnName: + aggregateID = column.Value + case e.aggregateTypeColumnName: + aggregateType = column.Value + case e.payloadColumnName: + payload = column.Value + default: continue } } - if aggregateIDIndex == -1 { - return -1, -1, -1, fmt.Errorf("outbox table has no '%s' column", e.aggregateIDColumnName) + if aggregateID == nil { + return nil, nil, nil, fmt.Errorf("%s Column not found", e.aggregateIDColumnName) } - if aggregateTypeIndex == -1 { - return -1, -1, -1, fmt.Errorf("outbox table has no '%s' column", e.aggregateTypeColumnName) + if aggregateType == nil { + return nil, nil, nil, fmt.Errorf("%s Column not found", e.aggregateTypeColumnName) } - if payloadIndex == -1 { - return -1, -1, -1, fmt.Errorf("outbox table has no '%s' column", e.payloadColumnName) + if payload == nil { + return nil, nil, nil, fmt.Errorf("%s Column not found", e.payloadColumnName) } - return aggregateIDIndex, aggregateTypeIndex, payloadIndex, nil -} - -func getHeaderColumnsValues( - row []interface{}, - columnIndicesMap []headerIndex, -) []eventHeader { - r := make([]eventHeader, 0, len(columnIndicesMap)+1) - for _, i := range columnIndicesMap { - r = append(r, eventHeader{ - Key: []byte(i.name), - Value: []byte(fmt.Sprintf("%v", row[i.index])), - }) - } - - return r -} - -type headerIndex struct { - name string - index int -} - -func (e *EventMapper) getHeadersColumnsIndices(event *canal.RowsEvent, columnNames []string) ([]headerIndex, error) { - r := make([]headerIndex, 0, len(columnNames)) -outerLoop: - for _, cm := range columnNames { - for i, etc := range event.Table.Columns { - if etc.Name == cm { - r = append(r, headerIndex{ - name: cm, - index: i, - }) - continue outerLoop - } - } - - return nil, fmt.Errorf("column not found with name: %s", cm) - } - - return r, nil + return aggregateID, aggregateType, payload, nil } diff --git a/router/pkg/run/run_test.go b/router/pkg/run/run_test.go index c0aa82f..f811597 100644 --- a/router/pkg/run/run_test.go +++ b/router/pkg/run/run_test.go @@ -103,9 +103,6 @@ func buildEventHandler(t *testing.T) *run.EventHandler { "", "", "", - nil, - nil, - false, ) require.NoError(t, err) return eh