Frizzle is a magic message (Msg
) bus designed for parallel processing with many goroutines.
Receive()
messages from a configuredSource
- Do your processing, possibly
Send()
eachMsg
on to one or moreSink
destinations Ack()
(orFail()
) theMsg
to notify theSource
that processing completed
Start with the example implementation which shows a simple canonical
implementation of a Processor
on top of Frizzle and most of the core functions.
high level interface
// Frizzle is a Msg bus for rapidly configuring and processing messages between multiple message services.
type Frizzle interface {
Receive() <-chan Msg
Send(m Msg, dest string) error
Ack(Msg) error
Fail(Msg) error
Events() <-chan Event
AddOptions(...Option)
FlushAndClose(timeout time.Duration) error
Close() error
}
func Init(source Source, sink Sink, opts ...Option) Frizzle
The core of the repo is a Friz
struct (returned by Init()
) which implements Frizzle
. The intent is for
separate Source
and Sink
implementations (in separate repos) to be mixed and matched with the glue of
Frizzle
. A processing library can take a Frizzle
input to allow easy re-use with multiple
underlying message technologies.
Friz also implements Source
and Sink
to allow chaining if needed.
If you write a new implementation, we'd love to add it to our list!
A basic interface which can be extended:
// Msg encapsulates an immutable message passed around by Frizzle
type Msg interface {
ID() string
Data() []byte
Timestamp() time.Time
}
A frizzle.SimpleMsg
struct is provided for basic use cases.
// Source defines a stream of incoming Msgs to be Received for processing,
// and reporting whether or not processing was successful.
type Source interface {
Receive() <-chan Msg
Ack(m Msg) error
Fail(m Msg) error
UnAcked() []Msg
Stop() error
Close() error
}
// Sink defines a message service where Msgs can be sent as part of processing.
type Sink interface {
Send(m Msg, dest string) error
Close() error
}
Frizzle supports a variety of Option
parameters for additional functionality to simplify your integration.
These can be included with Init()
or added using a friz.AddOptions()
call. Note that AddOptions()
updates the current friz and does not return anything.
Currently supported options:
Logger(log *zap.Logger)
- Include a logger to report frizzle-internal logging.Stats(stats StatsIncrementer)
- Include a stats client for frizzle-internal metrics reporting. See Stats for what metrics are supported.FailSink(s Sink, dest string)
- Provide a Sink and destination (kafka topic, kinesis stream etc) whereFail()
ed Msgs will be sent automatically.MonitorProcessingRate(pollPeriod time.Duration)
- Log the sum count of Acked and Failed Msgs everypollPeriod
.ReportAsyncErrors()
- Launch a simple go routine to monitor theEvents()
channel. All events are logged atError
orWarn
level; any events that matcherror
interface have a stat recorded. Logging and/or stats are disabled ifLogger()
/Stats()
have not been set, respectively.- This is a most basic handling that does not account for any specific Event types from Source/Sink implementations; developers should write an app specific monitoring routine to parse and handle specific Event cases (for which this can be a helpful starting template).
HandleShutdown(appShutdown func())
- Monitor forSIGINT
andSIGTERM
, callFlushAndClose()
followed by providedappShutdown
when they are received.WithTransformer(ft FrizTransformer)
- Add a transformer to modify the Msg's before they are sent or received. Currently only supports a "Simple Separator" Transformer which adds a specified record separator (such as newline) before sending if it isn't already present, and removes the same separator on receive if it is present.
Since Source and Sink implementations often send and receive Msgs in batch fashion,
They often may find out about any errors (or other important events) asynchronously.
To support this, async events can be recovered via a channel returned by the Friz.Events()
method.
If a Source/Sink does not implement the Eventer
interface this functionality will be ignored.
- Frizzle Events must provide a minimum
String()
interface; when consuming Events a type assertion switch is highly recommended to receive other relevant information.- A
default:
trap for unhandled cases is also highly recommended! - For a reference implementation of the same interface see here
- A
- A Friz's
Events()
channel will be closed after all underlying Source/SinkEvents()
channels are closed.- If a Friz is initialized without any Source/Sinks that implement
Events()
, the channel returned byFriz.Events()
will be closed immediately.
- If a Friz is initialized without any Source/Sinks that implement
In addition to the String()
method required by frizzle, currently only errors are
returned by frinesis (no other event types) so all Events recovered will also conform
to error
interface.
Transformers provide a mechanism to do simple updates to a Msg
prior to a Send()
or Receive()
, which
can be added at initializiation but is otherwise transparent to the processor and Source/Sink.
This can be useful in a case where e.g. you need to apply a transform when running on one messaging platform
but not another, and don't want to expose the core processing code to information about which platform
is in use.
Frizzle supports adding Transformers with a WithTransformer()
Option:
// WithTransformer returns an Option to add the provided FrizTransformer to a Frizzle
func WithTransformer(ft FrizTransformer) Option
// Transform is a type that modifies a Msg
type Transform func(Msg) Msg
// FrizTransformer provides a Transform to apply when a Msg is sent or received
type FrizTransformer interface {
SendTransform() Transform
ReceiveTransform() Transform
}
An example implementation to add and remove a separator suffix on each Msg is included in transform.go. To reduce clutter we generally suggest implementing a new Transform in a separate repo, but we can consider adding high utility ones here.
As of Go 1.11, frizzle uses go mod for dependency management.
$ go get github.com/qntfy/frizzle
$ cd frizzle
$ go build
go test -v --cover ./...
We recommend building Sources and Sinks to initialize using Viper, typically through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix such as before the below values.
Variable | Required | Description | Default |
---|---|---|---|
BUFFER_SIZE | source (optional) | size of Input() channel buffer |
500 |
MOCK | optional | mocks don't track sent or unacked Msgs, just return without error | false |
StatsIncrementer
is a simple interface with just Increment(bucket string)
; based on github.com/alexcesaro/statsd
but potentially compatible with a variety of metrics engines. When Stats()
is set, Frizzle records the following metrics.
If a Logger()
has been set, each of the below also generates a Debug level log with the ID() of the Msg.
Bucket | Description |
---|---|
ctr.rcv | count of Msgs received from Source |
ctr.send | count of Msgs sent to Sink |
ctr.ack | count of Msgs Ack'ed by application |
ctr.fail | count of Msgs Fail'ed by application |
ctr.failsink | count of Msgs sent to FailSink |
ctr.error | count of error s from Events() * |
* only recorded if ReportAsyncErrors is running
Contributions welcome! Take a look at open issues. New Source/Sink implementations should be added in separate repos. If you let us know (and link to test demonstrating it conforms to the interface) we are happy to link them here!