-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add CE3 based Kinesis sink #338
Conversation
6f6d48e
to
697d803
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good.
However, the old collector had quite a intricate interplay between the kinesis and sqs queues. Here you have chosen to base your sink from the Enrich implementation, not the old collector implementation. That might be a good decision, but I don't know yet! We might find it's difficult to patch-in the SQS buffer into this implementation.
Time will tell....
Dependencies.Libraries.kinesis, | ||
Dependencies.Libraries.sts, | ||
Dependencies.Libraries.sqs, | ||
// Dependencies.Libraries.sts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need sts as a runtime dependency, so because that's how authorization works when it runs on EKS.
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration | ||
import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResult} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you're using v1 of the aws sdk, i.e. classes that start with com.amazonaws
. I know that's what enrich does, and I know that's what the old collector does. But I think it would be great to update to v2 of the aws sdk (classes that start with software.amazon
).
The newer kinesis sdk has this async client which returns a CompletableFuture
when you make a request. You could adapt the CompletableFuture
to a F
using Async
. This would be slightly nicer than using the old blocking client and wrapping the request in Async.blocking(...)
.
private def getProvider(awsConfig: KinesisSinkConfig.AWSConfig): AWSCredentialsProvider = { | ||
def isDefault(key: String): Boolean = key == "default" | ||
|
||
def isIam(key: String): Boolean = key == "iam" | ||
|
||
def isEnv(key: String): Boolean = key == "env" | ||
|
||
((awsConfig.accessKey, awsConfig.secretKey) match { | ||
case (a, s) if isDefault(a) && isDefault(s) => | ||
new DefaultAWSCredentialsProviderChain() | ||
case (a, s) if isDefault(a) || isDefault(s) => | ||
throw new IllegalArgumentException("accessKey and secretKey must both be set to 'default' or neither") | ||
case (a, s) if isIam(a) && isIam(s) => | ||
InstanceProfileCredentialsProvider.getInstance() | ||
case (a, s) if isIam(a) && isIam(s) => | ||
throw new IllegalArgumentException("accessKey and secretKey must both be set to 'iam' or neither") | ||
case (a, s) if isEnv(a) && isEnv(s) => | ||
new EnvironmentVariableCredentialsProvider() | ||
case (a, s) if isEnv(a) || isEnv(s) => | ||
throw new IllegalArgumentException("accessKey and secretKey must both be set to 'env' or neither") | ||
case _ => | ||
new AWSStaticCredentialsProvider( | ||
new BasicAWSCredentials(awsConfig.accessKey, awsConfig.secretKey) | ||
) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would vote to not do any of this. These days we always use the default credentials chain. I don't think anyone benefits from having extra credentials methods available. The default credentials chain is well documented by AWS, and it allows several different methods of authentication.
good <- KinesisSink.create[IO](config.sink, config.buffer, config.good) | ||
bad <- KinesisSink.create[IO](config.sink, config.buffer, config.bad) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By doing this you create two kinesis clients, one for each sink. That's probably OK. Is it worth trying to share a single kinesis client across both sinks?
case cn @ "cn-north-1" => s"https://kinesis.$cn.amazonaws.com.cn" | ||
case cn @ "cn-northwest-1" => s"https://kinesis.$cn.amazonaws.com.cn" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is copied from some other Snowplow code..... but I don't like it. Feel like we are re-inventing what the aws sdk already does automatically.
I understand that we need a optional customEndpoint
to enable localstack. But I would make it so that if customEndpoint
is not set, then we simply do not call .withEndpointConfiguration
on the client builder.
.chunks | ||
.parEvalMapUnbounded(events => writeToKinesis(events, config, streamName, kinesis, bufferConfig)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not quite right, but the reason is hard to explain!! I will have a go at explaining.....
If you call .parEvalMapUnbounded
then it executes the IO immediately as soon as there is something available upstream. And if you call .chunks
on a queue then it emits the current contents of the queue, which might only be 1 event.
So I think here when you call writeToKinesis
you are calling it with a very small chunk of events. Even if the collector is receiving a lot of events, you do not allow the queue to fill up significantly. Because it calls writeToKinesis
as soon as the queue has any events in it.
This is a problem, because writing to kinesis is more efficient if we can build up big batches before writing.
for { | ||
eventsBuffer <- Resource.eval(Queue.unbounded[F, Option[Event]]) | ||
kinesisClient <- KinesisClient.create(config, streamName) | ||
kinesisWriteOutcome <- WritingToKinesisTask.run[F](config, buffer, streamName, eventsBuffer, kinesisClient) | ||
sink <- Resource.make(createSink(config, eventsBuffer))(stopSink(eventsBuffer, kinesisWriteOutcome)) | ||
} yield sink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Closing in favour of #339 |
No description provided.