-
Notifications
You must be signed in to change notification settings - Fork 2
/
sink.go
140 lines (121 loc) · 3.38 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package frinesis
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/qntfy/frinesis/batchproducer"
"github.com/qntfy/frizzle"
"github.com/spf13/viper"
"go.uber.org/zap"
)
var (
_ frizzle.Sink = (*Sink)(nil)
_ frizzle.Eventer = (*Sink)(nil)
defaultFlushTimeout = 30 * time.Second
)
// Sink provides a frizzle interface for writing to AWS Kinesis
type Sink struct {
client *kinesis.Kinesis
config *batchproducer.Config
prods map[string]batchproducer.Producer
prodMux sync.RWMutex
flushTimeout time.Duration
evtChan chan frizzle.Event
}
// InitSink initializes a basic sink with no logging
func InitSink(config *viper.Viper) (*Sink, error) {
return InitSinkWithLogger(config, zap.NewNop())
}
// InitSinkWithLogger initializes a basic Sink with a provided logger
func InitSinkWithLogger(config *viper.Viper, logger *zap.Logger) (*Sink, error) {
client, err := ClientFromViper(config)
if err != nil {
return nil, err
}
flushTimeout := defaultFlushTimeout
if config.IsSet("kinesis_flush_timeout") {
flushTimeout = config.GetDuration("kinesis_flush_timeout")
}
// TODO: further optimize the config
cfg := batchproducer.DefaultConfig
cfg.BatchSize = 500
cfg.Logger = logger
s := &Sink{
client: client,
config: &cfg,
prods: make(map[string]batchproducer.Producer),
flushTimeout: flushTimeout,
evtChan: make(chan frizzle.Event),
}
return s, nil
}
// Send a Msg to topic. Initializes a kinesis producer for this topic if this
// Sink hasn't sent to it before.
func (s *Sink) Send(m frizzle.Msg, topic string) error {
s.prodMux.RLock()
prod, ok := s.prods[topic]
s.prodMux.RUnlock()
if !ok {
var err error
if prod, err = s.addTopicProducer(topic); err != nil {
return err
}
}
return prod.Add(m.Data(), generateID())
}
func (s *Sink) addTopicProducer(topic string) (batchproducer.Producer, error) {
s.prodMux.Lock()
defer s.prodMux.Unlock()
prod, ok := s.prods[topic]
if ok {
// topic producer already exists
return prod, nil
}
prod, err := batchproducer.New(s.client, topic, *s.config)
if err != nil {
return nil, err
}
if err = prod.Start(); err != nil && err != batchproducer.ErrAlreadyStarted {
return nil, err
}
// Pass along async events from batchproducer
go func() {
for e := range prod.Events() {
s.evtChan <- frizzle.Event(e)
}
}()
s.prods[topic] = prod
return prod, nil
}
// Events reports async Events that occur during processing
func (s *Sink) Events() <-chan frizzle.Event {
return (<-chan frizzle.Event)(s.evtChan)
}
// Close the Sink
func (s *Sink) Close() error {
s.prodMux.Lock()
defer s.prodMux.Unlock()
for topic := range s.prods {
_, remaining, err := s.prods[topic].Flush(s.flushTimeout, true)
if err != nil && err != batchproducer.ErrAlreadyStopped {
return err
}
if remaining > 0 {
return fmt.Errorf("topic %s timed out with %d messages still un-sent", topic, remaining)
}
}
return nil
}
// Restart producer go-routines to support Send() calls after Close() without re-initializing
// (intended for use in Lambda where Sink object may be preserved in memory for subsequent run after `Close()`)
func (s *Sink) Restart() error {
s.prodMux.Lock()
defer s.prodMux.Unlock()
for topic := range s.prods {
if err := s.prods[topic].Start(); err != nil && err != batchproducer.ErrAlreadyStarted {
return err
}
}
return nil
}