-
Notifications
You must be signed in to change notification settings - Fork 0
/
clientoptions.go
158 lines (137 loc) · 3.63 KB
/
clientoptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package wkafka
import (
"time"
"github.com/cenkalti/backoff/v4"
"github.com/twmb/franz-go/pkg/kgo"
)
// DefaultBatchCount is default batch count for batch consumer, if not set.
var DefaultBatchCount = 100
func defaultBackoff() backoff.BackOff {
return backoff.NewExponentialBackOff(
backoff.WithInitialInterval(2*time.Second),
backoff.WithMaxInterval(7*time.Second),
backoff.WithMaxElapsedTime(30*time.Second),
)
}
type options struct {
AppName string
ConsumerEnabled bool
ConsumerDLQEnabled bool
ConsumerConfig *ConsumerConfig
// Consumer consumer
ClientID string
KGOOptions []kgo.Opt
KGOOptionsDLQ []kgo.Opt
AutoTopicCreation bool
Logger Logger
Meter Meter
Ping bool
PingRetry bool
PingBackoff backoff.BackOff
}
func (o *options) apply(opts ...Option) {
for _, opt := range opts {
opt(o)
}
}
type Option func(*options)
// WithClientID to set client_id in kafka server.
// Default is using DefaultClientID variable.
//
// No need to set most of time!
func WithClientID(clientID string) Option {
return func(o *options) {
o.ClientID = clientID
}
}
func WithMeter(m Meter) Option {
return func(o *options) {
o.Meter = m
}
}
// WithClientInfo to set client_id in kafka server.
// Not usable with WithClientID option.
// - appname:version@hostname
func WithClientInfo(appName, version string) Option {
return func(o *options) {
o.ClientID = appName + ":" + version + "@" + idHostname
o.AppName = appName
}
}
// WithAppName to set app name in kafka server.
// Default is using idProgname variable.
//
// Use WithClientInfo instead if you want to set version and appname.
func WithAppName(appName string) Option {
return func(o *options) {
o.ClientID = appName + "@" + idHostname
o.AppName = appName
}
}
// WithAutoTopicCreation to enable auto topic creation for producer and consumer.
//
// Default is enabled.
//
// - Producer will fail if topic not exists and auto topic creation is disabled.
// - Consumer will wait and not fail when is disabled.
func WithAutoTopicCreation(v bool) Option {
return func(o *options) {
o.AutoTopicCreation = v
}
}
// WithKGOOptions to set kgo options.
func WithKGOOptions(opts ...kgo.Opt) Option {
return func(o *options) {
o.KGOOptions = append(o.KGOOptions, opts...)
}
}
// WithKGOOptionsDLQ to set kgo options for DLQ client.
//
// If empty than it will use same options as main client.
func WithKGOOptionsDLQ(opts ...kgo.Opt) Option {
return func(o *options) {
o.KGOOptionsDLQ = append(o.KGOOptionsDLQ, opts...)
}
}
// WithConsumer configures the client to use the provided consumer config.
// - It is shallow copied and to make safe use skip function to modify skip map.
func WithConsumer(cfg ConsumerConfig) Option {
return func(o *options) {
o.ConsumerConfig = &cfg
o.ConsumerEnabled = true
}
}
// WithLogger configures the client to use the provided logger.
// - For zerolog logz.AdapterKV{Log: logger} can usable.
// - Default is using zerolog's global logger.
func WithLogger(logger Logger) Option {
return func(o *options) {
o.Logger = logger
}
}
// WithNoLogger to disable logger.
func WithNoLogger(v bool) Option {
return func(o *options) {
if v {
o.Logger = LogNoop{}
}
}
}
// WithPing to ping kafka brokers on client creation.
// - Default is enabled.
func WithPing(v bool) Option {
return func(o *options) {
o.Ping = v
}
}
// WithPingRetry to retry ping kafka brokers on client creation.
func WithPingRetry(v bool) Option {
return func(o *options) {
o.PingRetry = v
}
}
func WithPingBackoff(b backoff.BackOff) Option {
return func(o *options) {
o.PingBackoff = b
}
}