From f7e2c260928f269e35154c01ab401121484e1541 Mon Sep 17 00:00:00 2001 From: Joshua Carroll Date: Thu, 24 Jan 2019 02:03:21 +0000 Subject: [PATCH] Initial Migration / update for travis --- .gitignore | 4 + .travis.yml | 37 +++++++ README.md | 67 +++++++++++- go.mod | 9 ++ go.sum | 63 +++++++++++ integration_test.go | 252 ++++++++++++++++++++++++++++++++++++++++++++ sink.go | 118 +++++++++++++++++++++ source.go | 169 +++++++++++++++++++++++++++++ utils.go | 26 +++++ 9 files changed, 743 insertions(+), 2 deletions(-) create mode 100644 .travis.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 integration_test.go create mode 100644 sink.go create mode 100644 source.go create mode 100644 utils.go diff --git a/.gitignore b/.gitignore index f1c181e..38c84d4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,10 @@ *.so *.dylib +# dependency stuff +/vendor/ +kafka.yml + # Test binary, build with `go test -c` *.test diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..dcf3c30 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,37 @@ +language: go +go: +- 1.x +services: docker + +env: + global: + - DOCKER_HOST_IP="127.0.0.1" + - KAFKA_BROKERS="127.0.0.1:9092" + +before_install: +# Initialize kafka +- curl --silent -L -o kafka.yml https://raw.githubusercontent.com/simplesteph/kafka-stack-docker-compose/v5.1.0/zk-single-kafka-single.yml +- docker-compose -f kafka.yml up -d +# Install librdkafka +- curl --silent -OL https://raw.githubusercontent.com/confluentinc/confluent-kafka-go/v0.11.4/mk/bootstrap-librdkafka.sh +- bash bootstrap-librdkafka.sh v0.11.4 +- export PKG_CONFIG_PATH="$PWD/tmp-build/lib/pkgconfig" +- export LD_LIBRARY_PATH="$PWD/tmp-build/lib" +- export DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" +- PATH="$PATH:$GOPATH/bin" +- sudo ldconfig +# Download the dependencies +- export GO111MODULE=on +- go mod download +- go mod vendor +- go mod verify +- go get github.com/mattn/goveralls +- go get golang.org/x/tools/cmd/cover + +script: +- curl --silent -OL https://raw.githubusercontent.com/vishnubob/wait-for-it/8ed92e8cab83cfed76ff012ed4a36cef74b28096/wait-for-it.sh +- chmod a+x wait-for-it.sh && ./wait-for-it.sh $KAFKA_BROKERS +- "$HOME/gopath/bin/goveralls -service=travis-ci -v" + +after_script: +- docker-compose -f kafka.yml down diff --git a/README.md b/README.md index db9de79..c127db3 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,65 @@ -# frafka -Frizzle for Apache Kafka +# Frafka + +Frafka is a Kafka implementation for [Frizzle](github.com/qntfy/frizzle) based on [confluent-go-kafka](https://github.com/confluentinc/confluent-kafka-go). + +Frizzle is a magic message (`Msg`) bus designed for parallel processing w many goroutines. + * `Receive()` messages from a configured `Source` + * Do your processing, possibly `Send()` each `Msg` on to one or more `Sink` destinations + * `Ack()` (or `Fail()`) the `Msg` to notify the `Source` that processing completed + +## Prereqs / Build instructions + +### Go mod + +As of Go 1.11, frafka uses [go mod](https://github.com/golang/go/wiki/Modules) for dependency management. + +### Install librdkafka + +Frafka depends on C library `librdkafka` (>=`v0.11.4`). For Debian 9+ (which includes golang docker images), +it has to be built from source. Fortunately, there's a script for that. +``` + # Install librdkafka + - curl --silent -OL https://raw.githubusercontent.com/confluentinc/confluent-kafka-go/v0.11.4/mk/bootstrap-librdkafka.sh + - bash bootstrap-librdkafka.sh v0.11.4 /usr/local + - ldconfig +``` + +Once that is installed, should be good to go with +``` +$ go get github.com/qntfy/frafka +$ cd frafka +$ go build +``` + +## Running the tests + +`go test -v --cover ./...` + +## Configuration +Frafka Sources and Sinks are configured using [Viper](https://godoc.org/github.com/spf13/viper). +``` +func InitSink(config *viper.Viper) (*Sink, error) + +func InitSource(config *viper.Viper) (*Source, error) +``` + +We typically initialize Viper through environment variables (but client can do whatever it wants, +just needs to provide the configured Viper object with relevant values). The application might +use a prefix before the below values. + +| Variable | Required | Description | Default | +|---------------------------|:--------:|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------:| +| KAFKA_BROKERS | required | address(es) of kafka brokers, space separated | | +| KAFKA_TOPICS | source | topic(s) to read from | | +| KAFKA_CONSUMER_GROUP | source | consumer group value for coordinating multiple clients | | +| KAFKA_CONSUME_LATEST_FIRST | source (optional) | start at the beginning or end of topic | earliest | + +## Async Error Handling +Since records are sent in batch fashion, Kafka may report errors or other information asynchronously. +Event can be recovered via channels returned by the `Sink.Events()` and `Source.Events()` methods. +Partition changes and EOF will be reported as non-error Events, other errors will conform to `error` interface. +Where possible, Events will retain underlying type from [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go) +if more information is desired. + +## Contributing +Contributions welcome! Take a look at open issues. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e9d55e0 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/qntfy/frafka + +require ( + github.com/confluentinc/confluent-kafka-go v0.11.4 + github.com/gofrs/uuid v3.2.0+incompatible + github.com/qntfy/frizzle v0.5.0 + github.com/spf13/viper v1.2.1 + github.com/stretchr/testify v1.2.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8b88a41 --- /dev/null +++ b/go.sum @@ -0,0 +1,63 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alexcesaro/statsd v2.0.0+incompatible h1:HG17k1Qk8V1F4UOoq6tx+IUoAbOcI5PHzzEUGeDD72w= +github.com/alexcesaro/statsd v2.0.0+incompatible/go.mod h1:vNepIbQAiyLe1j480173M6NYYaAsGwEcvuDTU3OCUGY= +github.com/confluentinc/confluent-kafka-go v0.11.4 h1:uH5doflVcMn+2G/ECv0wxpgmVkvEpTwYFW57V2iLqHo= +github.com/confluentinc/confluent-kafka-go v0.11.4/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I= +github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/qntfy/frizzle v0.5.0 h1:v5cJNK4jYZbPBPC8x05CwjPlYtw+krhEUgRK6vXbbTo= +github.com/qntfy/frizzle v0.5.0/go.mod h1:JpLK0xve+4H8+5RKf1Z0diqrJePL0xOrDSrLNtaZZbI= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= +github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= +github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.2 h1:Fy0orTDgHdbnzHcsOgfCN4LtHf0ec3wwtiwJqwvf3Gc= +github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M= +github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181119195503-ec83556a53fe h1:I5KvcSfxR/TkvFksuALBTCS44kh6MaPO1rHR9vT0iQQ= +golang.org/x/sys v0.0.0-20181119195503-ec83556a53fe/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= +gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..a4abffd --- /dev/null +++ b/integration_test.go @@ -0,0 +1,252 @@ +package frafka_test + +import ( + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/qntfy/frafka" + "github.com/qntfy/frizzle" + "github.com/qntfy/frizzle/mocks" + "github.com/spf13/viper" + "github.com/stretchr/testify/suite" +) + +const ( + baseConsumerGroup = "testGroup" + frizConsumerGroup = "frizGroup" + testTopic = "test.topic.1" +) + +type sourceTestSuite struct { + suite.Suite + prod *kafka.Producer + v *viper.Viper + topic string + src frizzle.Source +} + +type sinkTestSuite struct { + suite.Suite + cons *kafka.Consumer + v *viper.Viper + topic string + sink frizzle.Sink +} + +func loadKafkaTestENV() string { + // Setup viper and config from ENV + v := viper.New() + v.AutomaticEnv() + v.SetDefault("kafka_brokers", "0.0.0.0:9092") + return v.GetString("kafka_brokers") +} + +func TestKafkaSource(t *testing.T) { + suite.Run(t, new(sourceTestSuite)) +} + +func TestKafkaSink(t *testing.T) { + suite.Run(t, new(sinkTestSuite)) +} + +func (s *sourceTestSuite) SetupSuite() { + rand.Seed(time.Now().UnixNano()) + brokers := loadKafkaTestENV() + var err error + s.prod, err = kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": brokers}) + if err != nil { + s.Error(err) + s.FailNow("unable to establish connection during test setup") + } + + s.v = viper.New() + s.v.Set("kafka_brokers", brokers) + s.v.Set("kafka_consumer_group", frizConsumerGroup) +} + +func (s *sourceTestSuite) TearDownSuite() { + s.prod.Close() +} + +func (s *sinkTestSuite) SetupSuite() { + rand.Seed(time.Now().UnixNano()) + brokers := loadKafkaTestENV() + var err error + s.cons, err = kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": brokers, + "group.id": baseConsumerGroup, + "auto.offset.reset": "earliest", + }) + if err != nil { + s.Error(err) + s.FailNow("unable to establish connection during test setup") + } + + s.v = viper.New() + s.v.Set("kafka_brokers", brokers) +} + +func (s *sinkTestSuite) TearDownSuite() { + s.NoError(s.cons.Close()) +} + +func kafkaTopic(s string) string { + r := strings.Replace(s, "/", ".", -1) + suffix := strconv.Itoa(rand.Intn(100000)) + return strings.Join([]string{r, "topic", suffix}, ".") +} + +func (s *sourceTestSuite) SetupTest() { + s.topic = kafkaTopic(s.T().Name()) + s.v.Set("kafka_topics", s.topic) + + var err error + s.src, err = frafka.InitSource(s.v) + if err != nil { + s.Error(err) + s.Fail("unable to initialize source") + } + go func() { + for ev := range s.src.(frizzle.Eventer).Events() { + s.T().Logf("async message: %s", ev) + } + }() +} + +func (s *sourceTestSuite) TearDownTest() { + s.src = nil +} + +func (s *sinkTestSuite) SetupTest() { + s.topic = kafkaTopic(s.T().Name()) + err := s.cons.Subscribe(s.topic, nil) + if err != nil { + s.Error(err) + s.Fail("consumer unable to subscribe to topic") + } + + s.sink, err = frafka.InitSink(s.v) + if err != nil { + s.Error(err) + s.Fail("unable to initialize sink") + } + go func() { + for ev := range s.sink.(frizzle.Eventer).Events() { + s.T().Logf("async message: %s", ev) + } + }() +} + +func (s *sinkTestSuite) TearDownTest() { + s.sink.Close() + s.sink = nil +} + +func (s *sinkTestSuite) TestSend() { + expectedMessages := []string{"Hello", "out", "there", "kafka", "world!"} + receivedMessages := []string{} + s.T().Log(s.topic) + + for _, m := range expectedMessages { + msg := frizzle.NewSimpleMsg("foo", []byte(m), time.Now()) + s.sink.Send(msg, s.topic) + } + + for len(receivedMessages) < len(expectedMessages) { + ev := s.cons.Poll(100) + if ev == nil { + continue + } + switch e := ev.(type) { + case *kafka.Message: + receivedMessages = append(receivedMessages, string(e.Value)) + case kafka.PartitionEOF: + s.Failf("received PartitionEOF, try rerunning test", "%% Received: %v\n", e) + case kafka.Error: + s.FailNowf("received Kafka error", "%% Error: %v\n", e) + } + } + + s.Equal(expectedMessages, receivedMessages) +} + +func (s *sourceTestSuite) produce(values []string) { + for _, m := range values { + s.prod.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &s.topic, Partition: kafka.PartitionAny}, + Value: []byte(m), + }, nil) + } +} + +func (s *sourceTestSuite) TestReceive() { + expectedValues := []string{"now", "we", "receive", "some", "new", "messages"} + receivedValues := []string{} + s.T().Log(s.topic) + + s.produce(expectedValues) + receivedMessages := []frizzle.Msg{} + for len(receivedMessages) < len(expectedValues) { + select { + case m := <-s.src.Receive(): + receivedMessages = append(receivedMessages, m) + } + } + + // Confirm all show up in UnAcked() + s.Equal(len(expectedValues), len(s.src.UnAcked())) + + for _, m := range receivedMessages { + receivedValues = append(receivedValues, string(m.Data())) + s.src.Ack(m) + } + + // Confirm all have been Acked and received and expected values match + s.Equal(0, len(s.src.UnAcked())) + s.Equal(expectedValues, receivedValues) +} + +func (s *sourceTestSuite) TestUnAckedAndFlush() { + expectedValues := []string{"testing out", "stop", "and unacked"} + receivedValues := []string{} + s.T().Log(s.topic) + + s.produce(expectedValues) + mSink := &mocks.Sink{} + mSink.On("Close").Return(nil) + f := frizzle.Init(s.src, mSink) + + for len(receivedValues) < len(expectedValues)-1 { + select { + case m := <-f.Receive(): + receivedValues = append(receivedValues, string(m.Data())) + f.Ack(m) + } + } + + lastMsg := <-f.Receive() + s.Equal("and unacked", string(lastMsg.Data())) + s.Equal(1, len(s.src.UnAcked())) + + s.Nil(f.FlushAndClose(50 * time.Millisecond)) + s.Equal(0, len(s.src.UnAcked())) + s.Equal(frizzle.ErrAlreadyAcked, f.Fail(lastMsg)) + +} + +func (s *sourceTestSuite) TestStopAndClose() { + expectedValues := []string{"an unacked msg"} + s.T().Log(s.topic) + s.produce(expectedValues) + + lastMsg := <-s.src.Receive() + s.Equal("kafka source: need to call Stop() before Close()", s.src.Close().Error()) + s.src.Stop() + s.Equal(frizzle.ErrUnackedMsgsRemain, s.src.Close()) + s.src.Fail(lastMsg) + s.Nil(s.src.Close()) +} diff --git a/sink.go b/sink.go new file mode 100644 index 0000000..0ac98da --- /dev/null +++ b/sink.go @@ -0,0 +1,118 @@ +package frafka + +import ( + "errors" + "fmt" + "strings" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/qntfy/frizzle" + "github.com/spf13/viper" +) + +var ( + _ frizzle.Sink = (*Sink)(nil) + _ frizzle.Eventer = (*Sink)(nil) +) + +var ( + // how long to wait for messages to flush + flushTimeoutMS = 30 * 1000 +) + +// Sink encapsulates a kafka producer for Sending Msgs +type Sink struct { + prod *kafka.Producer + quitChan chan int + doneChan chan int + evtChan chan frizzle.Event +} + +// InitSink initializes a basic Sink +func InitSink(config *viper.Viper) (*Sink, error) { + if !config.IsSet("kafka_brokers") { + return nil, errors.New("brokers must be set for kafka Sink") + } + brokers := strings.Join(config.GetStringSlice("kafka_brokers"), ",") + + // TODO: Performance optimization in librdkafka + // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + // Key values: + // - queue.buffering.max.messages + // - queue.buffering.max.ms + // - compression.codec ? + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": brokers}) + if err != nil { + return nil, err + } + + s := &Sink{ + prod: p, + quitChan: make(chan int), + doneChan: make(chan int), + evtChan: make(chan frizzle.Event), + } + + go s.deliveryReports() + + return s, nil +} + +// deliveryReports receives async events from kafka Producer about whether +// message delivery is successful, any errors from broker, etc +func (s *Sink) deliveryReports() { + defer close(s.doneChan) + run := true + for run == true { + select { + case <-s.quitChan: + run = false + case e := <-s.prod.Events(): + switch ev := e.(type) { + case *kafka.Message: + m := ev + if m.TopicPartition.Error != nil { + s.evtChan <- frizzle.NewError(m.TopicPartition.Error.Error()) + } + case kafka.Error: + s.evtChan <- frizzle.Event(e) + default: + s.evtChan <- frizzle.Event(e) + } + } + } +} + +// Events reports async Events that occur during processing +func (s *Sink) Events() <-chan frizzle.Event { + return (<-chan frizzle.Event)(s.evtChan) +} + +// Send a Msg to specified topic +func (s *Sink) Send(m frizzle.Msg, topic string) error { + k := &kafka.Message{ + Value: m.Data(), + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + Partition: kafka.PartitionAny, + }, + } + s.prod.ProduceChannel() <- k + return nil +} + +// Close the Sink after flushing any Msgs not fully sent +func (s *Sink) Close() error { + // Flush any messages still pending send + if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 { + return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS) + } + // tell deliveryReports() goroutine to finish + s.quitChan <- 1 + // wait for it to finish + <-s.doneChan + // stop event chan + close(s.evtChan) + s.prod.Close() + return nil +} diff --git a/source.go b/source.go new file mode 100644 index 0000000..098f7d5 --- /dev/null +++ b/source.go @@ -0,0 +1,169 @@ +package frafka + +import ( + "errors" + "strings" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/qntfy/frizzle" + "github.com/qntfy/frizzle/common" + "github.com/spf13/viper" +) + +var ( + _ frizzle.Source = (*Source)(nil) + _ frizzle.Eventer = (*Source)(nil) +) + +var ( + kafkaEventChannelSize = 100 + kafkaSessionTimeoutMS = 6000 + stopCloseTimeout = 3 * time.Second +) + +// Source encapsulates a kafka consumer for receiving and tracking Msgs +type Source struct { + cons *kafka.Consumer + msgChan chan frizzle.Msg + unAcked *common.UnAcked + quitChan chan struct{} + doneChan chan struct{} + evtChan chan frizzle.Event +} + +// InitSource initializes a kafka Source +func InitSource(config *viper.Viper) (*Source, error) { + if !config.IsSet("kafka_brokers") || !config.IsSet("kafka_topics") || !config.IsSet("kafka_consumer_group") { + return nil, errors.New("brokers, topics and consumer_group must be set for kafka Source") + } + + startOffset := "earliest" + if config.GetBool("kafka_consume_latest_first") { + startOffset = "latest" + } + brokers := strings.Join(config.GetStringSlice("kafka_brokers"), ",") + + // TODO: Performance optimization in librdkafka + // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + // Key values: + // - queued.min.messages + // - fetch.message.max.bytes and related + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": brokers, // expects CSV + "group.id": config.GetString("kafka_consumer_group"), + "session.timeout.ms": kafkaSessionTimeoutMS, + "go.events.channel.enable": true, // support c.Events() + "go.events.channel.size": kafkaEventChannelSize, + "go.application.rebalance.enable": true, // we handle partition updates (needed for offset management) + "default.topic.config": kafka.ConfigMap{ + "auto.offset.reset": startOffset, + }, + }) + if err != nil { + return nil, err + } + + err = c.SubscribeTopics(config.GetStringSlice("kafka_topics"), nil) + if err != nil { + return nil, err + } + + s := &Source{ + cons: c, + msgChan: make(chan frizzle.Msg), + unAcked: common.NewUnAcked(), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), + evtChan: make(chan frizzle.Event), + } + go s.consume() + + return s, nil +} + +// consume events from kafka consumer +func (s *Source) consume() { + defer close(s.doneChan) +loop: + for { + select { + case <-s.quitChan: + break loop + case ev := <-s.cons.Events(): + switch e := ev.(type) { + case kafka.AssignedPartitions: + s.cons.Assign(e.Partitions) + s.evtChan <- frizzle.Event(e) + case kafka.RevokedPartitions: + s.cons.Unassign() + s.evtChan <- frizzle.Event(e) + case *kafka.Message: + s.handleMsg(e) + case kafka.PartitionEOF: + s.evtChan <- frizzle.Event(e) + case kafka.Error: + s.evtChan <- frizzle.Event(e) + default: + s.evtChan <- frizzle.Event(e) + } + } + } +} + +func (s *Source) handleMsg(k *kafka.Message) { + id := generateID() + m := frizzle.NewSimpleMsg(id, k.Value, k.Timestamp) + s.unAcked.Add(m) + s.msgChan <- m +} + +// Events reports async Events that occur during processing +func (s *Source) Events() <-chan frizzle.Event { + return (<-chan frizzle.Event)(s.evtChan) +} + +// Receive returns a channel for receiving Msgs +func (s *Source) Receive() <-chan frizzle.Msg { + return (<-chan frizzle.Msg)(s.msgChan) +} + +// Ack a Msg +func (s *Source) Ack(m frizzle.Msg) error { + return s.unAcked.Remove(m) +} + +// Fail a Msg +func (s *Source) Fail(m frizzle.Msg) error { + return s.unAcked.Remove(m) +} + +// UnAcked Msgs list +func (s *Source) UnAcked() []frizzle.Msg { + return s.unAcked.List() +} + +// Stop prevents new Msgs from being written to Receive() channel. It must +// be called before Close() will return. +func (s *Source) Stop() error { + close(s.quitChan) + return nil +} + +// Close cleans up underlying resources. +// It errors if Stop() has not been called and/or if there are +// unAcked Msgs. +func (s *Source) Close() error { + // confirm that consume() goroutine finished + select { + case <-s.doneChan: + case <-time.After(stopCloseTimeout): + return errors.New("kafka source: need to call Stop() before Close()") + } + if s.unAcked.Count() > 0 { + return frizzle.ErrUnackedMsgsRemain + } + close(s.msgChan) + close(s.evtChan) + return s.cons.Close() +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..f8110d9 --- /dev/null +++ b/utils.go @@ -0,0 +1,26 @@ +package frafka + +import ( + "github.com/gofrs/uuid" + "github.com/qntfy/frizzle" + "github.com/spf13/viper" +) + +// InitByViper initializes a full Frizzle with a kafka Source and Sink based on a provided Viper +func InitByViper(v *viper.Viper) (frizzle.Frizzle, error) { + src, err := InitSource(v) + if err != nil { + return nil, err + } + sink, err := InitSink(v) + if err != nil { + return nil, err + } + return frizzle.Init(src, sink), nil +} + +// generateID generates a unique ID for a Msg +func generateID() string { + id, _ := uuid.NewV4() + return id.String() +}