Frafka is a Kafka implementation for Frizzle based on confluent-go-kafka.
Frizzle is a magic message (Msg
) bus designed for parallel processing w many goroutines.
Receive()
messages from a configuredSource
- Do your processing, possibly
Send()
eachMsg
on to one or moreSink
destinations Ack()
(orFail()
) theMsg
to notify theSource
that processing completed
The underlying kafka library, confluent-kafka-go has some particularly important nuances:
- alpine builds (e.g.
FROM golang-1.14-alpine
should run all go commands with-tags musl
- e.g.
go test -tags musl ./...
- e.g.
- all builds producing an executable should run with
CGO_ENABLED=1
- not necessary for libraries, however.
Otherwise, should be good to go with
go get github.com/qntfy/frafka
cd frafka
go build
Create a new sink with NewSink
:
// error omitted - handle in proper code
sink, _ := frafka.NewSink("broker1:15151,broker2:15151", 16 * 1024)
Frafka has integration tests which require a kafka broker to test against. KAFKA_BROKERS
environment variable is
used by tests. simplesteph/kafka-stack-docker-compose
has a great simple docker-compose setup that is used in frafka CI currently.
curl --silent -L -o kafka.yml https://raw.githubusercontent.com/simplesteph/kafka-stack-docker-compose/v5.1.0/zk-single-kafka-single.yml
DOCKER_HOST_IP=127.0.0.1 docker-compose -f kafka.yml up -d
# takes a while to initialize; can use a tool like wait-for-it.sh in scripting
export KAFKA_BROKERS=127.0.0.1:9092
go test -v --cover ./...
Frafka Sources and Sinks are configured using Viper.
func InitSink(config *viper.Viper) (*Sink, error)
func InitSource(config *viper.Viper) (*Source, error)
We typically initialize Viper through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix before the below values.
Variable | Required | Description | Default |
---|---|---|---|
KAFKA_BROKERS | required | address(es) of kafka brokers, space separated | |
KAFKA_TOPICS | source | topic(s) to read from | |
KAFKA_CONSUMER_GROUP | source | consumer group value for coordinating multiple clients | |
KAFKA_CONFIG | optional | Add librdkafka client config, format key1=value1 key2=value2 ... |
|
KAFKA_CONFIG_FILE | optional | relative or absolute file path to a config file for librdkafka client config (see notes) |
KAFKA_CONFIG
allows setting arbitrary librdkafka configuration such asretries=10 max.in.flight=1000 delivery.report.only.error=true
KAFKA_CONFIG_FILE
allows another method for arbitrary config similar to KAFKA_CONFIG.KAFKA_CONFIG
takes priority overKAFKA_CONFIG_FILE
. The specified file is parsed with viper which supports a range of config file formats, for simplicity we recommend using yaml similar to the provided example file (used in tests).- Required config set via environment variables listed above (e.g.
KAFKA_BROKERS
) will always take priority over optional values - ifbootstrap.servers
is set inKAFKA_CONFIG
to a different value, it will be ignored. - Sensible defaults are set for several additional config values, see variables in
source.go
andsink.go
for specifics - An earlier version of frafka also supported setting specific optional kafka configs via environment variables, such as compression. This functionality has been removed to simplify config logic and reduce confusion if values are set in multiple places.
Some values that we commonly set, particularly in a memory constrained environment (e.g. running a producer/consumer service against a 9 partition topic with average message size less than 10KB and less than 200MB memory available).
- queued.max.messages.kbytes: 2048 (up to 16384)
- auto.offset.reset: (latest|earliest)
- receive.message.max.bytes: 2000000
- fetch.max.bytes: 1000000
- compression.type: snappy (and possibly linger.ms value depending on throughput/latency requirements) are great to set to reduce network traffic and disk usage on brokers
Since records are sent in batch fashion, Kafka may report errors or other information asynchronously.
Event can be recovered via channels returned by the Sink.Events()
and Source.Events()
methods.
Partition changes and EOF will be reported as non-error Events, other errors will conform to error
interface.
Where possible, Events will retain underlying type from confluent-kafka-go
if more information is desired.
Contributions welcome! Take a look at open issues.