forked from sendgridlabs/go-kinesis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kinesis.go
433 lines (378 loc) · 12.8 KB
/
kinesis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
// Package kinesis provide GOlang API for http://aws.amazon.com/kinesis/
package kinesis
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
)
const (
ActionKey = "Action"
RegionEnvName = "AWS_REGION_NAME"
// Regions
USEast1 = "us-east-1"
USWest2 = "us-west-2"
EUWest1 = "eu-west-1"
EUCentral1 = "eu-central-1"
APSouthEast1 = "ap-southeast-1"
APSouthEast2 = "ap-southeast-2"
APNortheast1 = "ap-northeast-1"
kinesisURL = "https://kinesis.%s.amazonaws.com"
)
// NewRegionFromEnv creates a region from the an expected environment variable
func NewRegionFromEnv() string {
return os.Getenv(RegionEnvName)
}
// Structure for kinesis client
type Kinesis struct {
client *Client
endpoint string
region string
version string
}
// KinesisClient interface implemented by Kinesis
type KinesisClient interface {
CreateStream(StreamName string, ShardCount int) error
DeleteStream(StreamName string) error
DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error)
GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error)
GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error)
ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error)
MergeShards(args *RequestArgs) error
PutRecord(args *RequestArgs) (resp *PutRecordResp, err error)
PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error)
SplitShard(args *RequestArgs) error
}
// New returns an initialized AWS Kinesis client using the canonical live “production” endpoint
// for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com
func New(auth Auth, region string) *Kinesis {
endpoint := fmt.Sprintf(kinesisURL, region)
return NewWithEndpoint(auth, region, endpoint)
}
// NewWithClient returns an initialized AWS Kinesis client using the canonical live “production” endpoint
// for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com but with the ability to create a custom client
// with specific configurations like a timeout
func NewWithClient(region string, client *Client) *Kinesis {
endpoint := fmt.Sprintf(kinesisURL, region)
return &Kinesis{client: client, version: "20131202", region: region, endpoint: endpoint}
}
// NewWithEndpoint returns an initialized AWS Kinesis client using the specified endpoint.
// This is generally useful for testing, so a local Kinesis server can be used.
func NewWithEndpoint(auth Auth, region string, endpoint string) *Kinesis {
// TODO: remove trailing slash on endpoint if there is one? does it matter?
// TODO: validate endpoint somehow?
return &Kinesis{client: NewClient(auth), version: "20131202", region: region, endpoint: endpoint}
}
// Create params object for request
func makeParams(action string) map[string]string {
params := make(map[string]string)
params[ActionKey] = action
return params
}
// RequestArgs store params for request
type RequestArgs struct {
params map[string]interface{}
Records []Record
}
// NewFilter creates a new Filter.
func NewArgs() *RequestArgs {
return &RequestArgs{
params: make(map[string]interface{}),
}
}
// Add appends a filtering parameter with the given name and value(s).
func (f *RequestArgs) Add(name string, value interface{}) {
f.params[name] = value
}
func (f *RequestArgs) AddData(value []byte) {
f.params["Data"] = value
}
// Error represent error from Kinesis API
type Error struct {
// HTTP status code (200, 403, ...)
StatusCode int
// error code ("UnsupportedOperation", ...)
Code string
// The human-oriented error message
Message string
RequestId string
}
// Return error message from error object
func (err *Error) Error() string {
if err.Code == "" {
return err.Message
}
return fmt.Sprintf("%s (%s)", err.Message, err.Code)
}
type jsonErrors struct {
Code string `json:"__type"`
Message string
}
func buildError(r *http.Response) error {
// Reading the body into a []byte because we might need to put it into an error
// message after having the JSON decoding fail to produce a message.
body, ioerr := ioutil.ReadAll(r.Body)
if ioerr != nil {
return fmt.Errorf("Could not read response body: %s", ioerr)
}
errors := jsonErrors{}
json.NewDecoder(bytes.NewReader(body)).Decode(&errors)
var err Error
err.Message = errors.Message
err.Code = errors.Code
err.StatusCode = r.StatusCode
if err.Message == "" {
err.Message = fmt.Sprintf("%s: %s", r.Status, body)
}
return &err
}
// Query by AWS API
func (kinesis *Kinesis) query(params map[string]string, data interface{}, resp interface{}) error {
jsonData, err := json.Marshal(data)
if err != nil {
return err
}
// request
request, err := http.NewRequest(
"POST",
kinesis.endpoint,
bytes.NewReader(jsonData),
)
if err != nil {
return err
}
// headers
request.Header.Set("Content-Type", "application/x-amz-json-1.1")
request.Header.Set("X-Amz-Target", fmt.Sprintf("Kinesis_%s.%s", kinesis.version, params[ActionKey]))
request.Header.Set("User-Agent", "Golang Kinesis")
// response
response, err := kinesis.client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != 200 {
return buildError(response)
}
if resp == nil {
return nil
}
return json.NewDecoder(response.Body).Decode(resp)
}
// CreateStream adds a new Amazon Kinesis stream to your AWS account
// StreamName is a name of stream, ShardCount is number of shards
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html
func (kinesis *Kinesis) CreateStream(StreamName string, ShardCount int) error {
params := makeParams("CreateStream")
requestParams := struct {
StreamName string
ShardCount int
}{
StreamName,
ShardCount,
}
err := kinesis.query(params, requestParams, nil)
if err != nil {
return err
}
return nil
}
// DeleteStream deletes a stream and all of its shards and data from your AWS account
// StreamName is a name of stream
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html
func (kinesis *Kinesis) DeleteStream(StreamName string) error {
params := makeParams("DeleteStream")
requestParams := struct {
StreamName string
}{
StreamName,
}
err := kinesis.query(params, requestParams, nil)
if err != nil {
return err
}
return nil
}
// MergeShards merges two adjacent shards in a stream and combines them into a single shard to reduce the stream's capacity to ingest and transport data
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html
func (kinesis *Kinesis) MergeShards(args *RequestArgs) error {
params := makeParams("MergeShards")
err := kinesis.query(params, args.params, nil)
if err != nil {
return err
}
return nil
}
// SplitShard splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html
func (kinesis *Kinesis) SplitShard(args *RequestArgs) error {
params := makeParams("SplitShard")
err := kinesis.query(params, args.params, nil)
if err != nil {
return err
}
return nil
}
// ListStreamsResp stores the information that provides by ListStreams API call
type ListStreamsResp struct {
HasMoreStreams bool
StreamNames []string
}
// ListStreams returns an array of the names of all the streams that are associated with the AWS account making the ListStreams request
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html
func (kinesis *Kinesis) ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error) {
params := makeParams("ListStreams")
resp = &ListStreamsResp{}
err = kinesis.query(params, args.params, resp)
if err != nil {
return nil, err
}
return
}
// DescribeStreamShards stores the information about list of shards inside DescribeStreamResp
type DescribeStreamShards struct {
AdjacentParentShardId string
HashKeyRange struct {
EndingHashKey string
StartingHashKey string
}
ParentShardId string
SequenceNumberRange struct {
EndingSequenceNumber string
StartingSequenceNumber string
}
ShardId string
}
// DescribeStreamResp stores the information that provides by DescribeStream API call
type DescribeStreamResp struct {
StreamDescription struct {
HasMoreShards bool
Shards []DescribeStreamShards
StreamARN string
StreamName string
StreamStatus string
}
}
// DescribeStream returns the following information about the stream: the current status of the stream,
// the stream Amazon Resource Name (ARN), and an array of shard objects that comprise the stream.
// For each shard object there is information about the hash key and sequence number ranges that
// the shard spans, and the IDs of any earlier shards that played in a role in a MergeShards or
// SplitShard operation that created the shard
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html
func (kinesis *Kinesis) DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error) {
params := makeParams("DescribeStream")
resp = &DescribeStreamResp{}
err = kinesis.query(params, args.params, resp)
if err != nil {
return nil, err
}
return
}
// GetShardIteratorResp stores the information that provides by GetShardIterator API call
type GetShardIteratorResp struct {
ShardIterator string
}
// GetShardIterator returns a shard iterator
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
func (kinesis *Kinesis) GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error) {
params := makeParams("GetShardIterator")
resp = &GetShardIteratorResp{}
err = kinesis.query(params, args.params, resp)
if err != nil {
return nil, err
}
return
}
// GetNextRecordsRecords stores the information that provides by GetNextRecordsResp
type GetRecordsRecords struct {
Data []byte
PartitionKey string
SequenceNumber string
}
func (r GetRecordsRecords) GetData() []byte {
return r.Data
}
// GetNextRecordsResp stores the information that provides by GetNextRecords API call
type GetRecordsResp struct {
NextShardIterator string
Records []GetRecordsRecords
}
// GetRecords returns one or more data records from a shard
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
func (kinesis *Kinesis) GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error) {
params := makeParams("GetRecords")
resp = &GetRecordsResp{}
err = kinesis.query(params, args.params, resp)
if err != nil {
return nil, err
}
return
}
// PutRecordResp stores the information that provides by PutRecord API call
type PutRecordResp struct {
SequenceNumber string
ShardId string
}
// PutRecord puts a data record into an Amazon Kinesis stream from a producer.
// args must contain a single record added with AddRecord.
// More info: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
func (kinesis *Kinesis) PutRecord(args *RequestArgs) (resp *PutRecordResp, err error) {
params := makeParams("PutRecord")
if _, ok := args.params["Data"]; !ok && len(args.Records) == 0 {
return nil, errors.New("PutRecord requires its args param to contain a record added with either AddRecord or AddData.")
} else if ok && len(args.Records) > 0 {
return nil, errors.New("PutRecord requires its args param to contain a record added with either AddRecord or AddData but not both.")
} else if len(args.Records) > 1 {
return nil, errors.New("PutRecord does not support more than one record.")
}
if len(args.Records) > 0 {
args.AddData(args.Records[0].Data)
args.Add("PartitionKey", args.Records[0].PartitionKey)
}
resp = &PutRecordResp{}
err = kinesis.query(params, args.params, resp)
if err != nil {
return nil, err
}
return
}
// PutRecords puts multiple data records into an Amazon Kinesis stream from a producer
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
func (kinesis *Kinesis) PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error) {
params := makeParams("PutRecords")
resp = &PutRecordsResp{}
args.Add("Records", args.Records)
err = kinesis.query(params, args.params, resp)
if err != nil {
return nil, err
}
return
}
// PutRecordsResp stores the information that provides by PutRecord API call
type PutRecordsResp struct {
FailedRecordCount int
Records []PutRecordsRespRecord
}
// RecordResp stores individual Record information provided by PutRecords API call
type PutRecordsRespRecord struct {
ErrorCode string
ErrorMessage string
SequenceNumber string
ShardId string
}
// Add data and partition for sending multiple Records to Kinesis in one API call
func (f *RequestArgs) AddRecord(value []byte, partitionKey string) {
r := Record{
Data: value,
PartitionKey: partitionKey,
}
f.Records = append(f.Records, r)
}
// Record stores the Data and PartitionKey for PutRecord or PutRecords calls to Kinesis API
type Record struct {
Data []byte
PartitionKey string
}