-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #8 from lorenzoranucci/refactoring
Delegate mapping between topic and aggregateId to event dispatcher adapter
- Loading branch information
Showing
10 changed files
with
455 additions
and
943 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,126 @@ | ||
package kafka | ||
|
||
import "github.com/Shopify/sarama" | ||
import ( | ||
"fmt" | ||
"regexp" | ||
|
||
func NewEventDispatcher(producer *Producer) *EventDispatcher { | ||
return &EventDispatcher{producer: producer} | ||
"github.com/Shopify/sarama" | ||
"github.com/lorenzoranucci/tor/router/pkg/run" | ||
) | ||
|
||
func NewEventDispatcher( | ||
syncProducer sarama.SyncProducer, | ||
admin sarama.ClusterAdmin, | ||
topics []Topic, | ||
headerMappings []HeaderMapping, | ||
) (*EventDispatcher, error) { | ||
err := createTopics(topics, admin) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &EventDispatcher{ | ||
syncProducer: syncProducer, | ||
admin: admin, | ||
topics: topics, | ||
headerMappings: headerMappings, | ||
}, nil | ||
} | ||
|
||
type EventDispatcher struct { | ||
producer *Producer | ||
syncProducer sarama.SyncProducer | ||
admin sarama.ClusterAdmin | ||
topics []Topic | ||
headerMappings []HeaderMapping | ||
} | ||
|
||
type Topic struct { | ||
Name string | ||
TopicDetail *sarama.TopicDetail | ||
AggregateType *regexp.Regexp | ||
} | ||
|
||
type HeaderMapping struct { | ||
ColumnName string | ||
HeaderName string | ||
} | ||
|
||
func (k *EventDispatcher) Dispatch( | ||
topic string, | ||
routingKey string, | ||
event []byte, | ||
headers []struct { | ||
Key []byte | ||
Value []byte | ||
}, | ||
) error { | ||
return k.producer.Dispatch(topic, routingKey, event, mapHeaders(headers)) | ||
func (k *EventDispatcher) Dispatch(event run.OutboxEvent) error { | ||
for _, topic := range k.topics { | ||
if !topic.AggregateType.MatchString(string(event.AggregateType)) { | ||
continue | ||
} | ||
|
||
headers, err := k.mapHeaders(event.Columns) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
_, _, err = k.syncProducer.SendMessage( | ||
&sarama.ProducerMessage{ | ||
Key: sarama.ByteEncoder(event.AggregateID), | ||
Topic: topic.Name, | ||
Value: sarama.ByteEncoder(event.Payload), | ||
Headers: headers, | ||
}, | ||
) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func mapHeaders(h []struct { | ||
Key []byte | ||
Value []byte | ||
}) []sarama.RecordHeader { | ||
r := make([]sarama.RecordHeader, 0, len(h)) | ||
func createTopics(topics []Topic, admin sarama.ClusterAdmin) error { | ||
topicNames := make([]string, 0, len(topics)) | ||
for _, topic := range topics { | ||
topicNames = append(topicNames, topic.Name) | ||
} | ||
|
||
topicMetadata, err := admin.DescribeTopics(topicNames) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, m := range topicMetadata { | ||
if m.Err != sarama.ErrUnknownTopicOrPartition { | ||
continue | ||
} | ||
|
||
for _, topic := range topics { | ||
if topic.Name != m.Name { | ||
continue | ||
} | ||
|
||
err := admin.CreateTopic(topic.Name, topic.TopicDetail, false) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (k *EventDispatcher) mapHeaders(columns []run.Column) ([]sarama.RecordHeader, error) { | ||
r := make([]sarama.RecordHeader, 0, len(columns)) | ||
|
||
outerLoop: | ||
for _, h := range k.headerMappings { | ||
for _, c := range columns { | ||
if h.ColumnName == string(c.Name) { | ||
r = append(r, sarama.RecordHeader{ | ||
Key: []byte(h.HeaderName), | ||
Value: c.Value, | ||
}) | ||
|
||
continue outerLoop | ||
} | ||
} | ||
|
||
for _, v := range h { | ||
r = append(r, v) | ||
return nil, fmt.Errorf("column not found for header. Column: %s, Header: %s", h.ColumnName, h.HeaderName) | ||
} | ||
|
||
return r | ||
return r, nil | ||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.