-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pubsub sink #327
Pubsub sink #327
Conversation
c71eb38
to
e7251c9
Compare
dbbd023
to
76a3474
Compare
f577884
to
9deaf81
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great @pondzix !
} | ||
} | ||
|
||
private def parse(resource: String): Either[ExitCode, Config[PubSubSinkConfig]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be put in http4s
module ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would keep it here for now. When we add more sinks and we indeed keep repeating this piece of code, we can extract to common shared module.
|
||
private def parse(resource: String): Either[ExitCode, Config[PubSubSinkConfig]] = { | ||
val path = Paths.get(getClass.getResource(resource).toURI) | ||
ConfigParser.fromPath[IO, PubSubSinkConfig](Some(path)).value.unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use CatsEffect
to avoid this unsafeRunSync()
?
object BuilderOps { | ||
|
||
implicit class PublisherBuilderOps(val builder: Publisher.Builder) extends AnyVal { | ||
def setProvidersForEmulator(): Publisher.Builder = | ||
customEmulatorHost().fold(builder) { emulatorHost => | ||
builder | ||
.setChannelProvider(createCustomChannelProvider(emulatorHost)) | ||
.setCredentialsProvider(NoCredentialsProvider.create()) | ||
} | ||
} | ||
|
||
implicit class TopicAdminBuilderOps(val builder: TopicAdminSettings.Builder) extends AnyVal { | ||
def setProvidersForEmulator(): TopicAdminSettings.Builder = | ||
customEmulatorHost().fold(builder) { emulatorHost => | ||
builder | ||
.setTransportChannelProvider(createCustomChannelProvider(emulatorHost)) | ||
.setCredentialsProvider(NoCredentialsProvider.create()) | ||
} | ||
} | ||
|
||
private def customEmulatorHost(): Option[String] = | ||
sys.env.get("PUBSUB_EMULATOR_HOST") | ||
|
||
private def createCustomChannelProvider(emulatorHost: String): FixedTransportChannelProvider = { | ||
val channel = ManagedChannelBuilder.forTarget(emulatorHost).usePlaintext().build() | ||
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice !
} | ||
|
||
object PubSubSink { | ||
private val UserAgent = s"snowplow/stream-collector-${BuildInfo.version}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private val UserAgent = s"snowplow/stream-collector-${BuildInfo.version}" | |
private val UserAgent = s"${BuildInfo.dockerAlias}:${BuildInfo.version}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't that be just BuildInfo.dockerAlias
? It looks like it already contains version
@@ -67,7 +67,7 @@ object ConfigParser { | |||
} | |||
|
|||
private def loadAll(config: TypesafeConfig): TypesafeConfig = | |||
namespaced(ConfigFactory.load(namespaced(config.withFallback(namespaced(ConfigFactory.load()))))) | |||
namespaced(config.withFallback(namespaced(ConfigFactory.load()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #334 (comment)
healthCheckTask: F[Unit] | ||
): Resource[F, Unit] = { | ||
val checkThenSleep = healthCheckTask *> Async[F].sleep(sinkConfig.startupCheckInterval) | ||
checkThenSleep.untilM_(isHealthyState.get).background.void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
.../src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala
Outdated
Show resolved
Hide resolved
) extends Sink[F] { | ||
|
||
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = | ||
produceBatch(events).start.void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the tracker doesn't batch the events, events
will always be of size 1. But nevertheless this seems fine to me to start a Fiber
for each single event.
) | ||
|
||
private def handlePublishError(error: Throwable): F[Unit] = | ||
isHealthyState.set(false) *> Logger[F].error(createErrorMessage(error)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to the discussion about logging policy, we'll need to decide if we should be logging errors for individual events or if we need some batching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This only becomes an issue if there is an outage and if it lasts for a long time. Ideally we should find a nice pattern in the code for de-duping log messages upon errors. And we should start using it everywhere when we've decided on that pattern.
No need to hold up this current PR though.
f5f9e72
to
656ccd6
Compare
656ccd6
to
cfdaa58
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good :)
|
||
private def createTopicAdminClient[F[_]: Sync](): Resource[F, TopicAdminClient] = { | ||
val builder = TopicAdminSettings.newBuilder().setProvidersForEmulator().build() | ||
Resource.make(Sync[F].delay(TopicAdminClient.create(builder)))(client => Sync[F].delay(client.close())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know if client.close()
is blocking or non-blocking?
) | ||
|
||
private def handlePublishError(error: Throwable): F[Unit] = | ||
isHealthyState.set(false) *> Logger[F].error(createErrorMessage(error)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This only becomes an issue if there is an outage and if it lasts for a long time. Ideally we should find a nice pattern in the code for de-duping log messages upon errors. And we should start using it everywhere when we've decided on that pattern.
No need to hold up this current PR though.
core
part of integration tests based on CE2 has been copied to the newhttp4s
module and ported to CE3.