diff --git a/build.sbt b/build.sbt index 23ea94c6c..1a27bd7b2 100644 --- a/build.sbt +++ b/build.sbt @@ -194,33 +194,27 @@ lazy val sqsDistroless = project .dependsOn(core % "test->test;compile->compile") lazy val pubsubSettings = - allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq( + allSettings ++ buildInfoSettings ++ Seq( moduleName := "snowplow-stream-collector-google-pubsub", Docker / packageName := "scala-stream-collector-pubsub", libraryDependencies ++= Seq( - Dependencies.Libraries.pubsub, - Dependencies.Libraries.protobuf, - // integration tests dependencies - Dependencies.Libraries.specs2It, - Dependencies.Libraries.specs2CEIt, - ), - IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value, - IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated + Dependencies.Libraries.catsRetry, + Dependencies.Libraries.fs2PubSub + ) ) lazy val pubsub = project .settings(pubsubSettings) .enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile;it->it") - .configs(IntegrationTest) + .dependsOn(http4s % "test->test;compile->compile") -lazy val pubsubDistroless = project - .in(file("distroless/pubsub")) - .settings(sourceDirectory := (pubsub / sourceDirectory).value) - .settings(pubsubSettings) - .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) - .dependsOn(core % "test->test;compile->compile;it->it") - .configs(IntegrationTest) +//lazy val pubsubDistroless = project +// .in(file("distroless/pubsub")) +// .settings(sourceDirectory := (pubsub / sourceDirectory).value) +// .settings(pubsubSettings) +// .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin) +// .dependsOn(core % "test->test;compile->compile;it->it") +// .configs(IntegrationTest) lazy val kafkaSettings = allSettings ++ buildInfoSettings ++ Seq( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 11c846652..7c335cb25 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -54,11 +54,14 @@ object Dependencies { val decline = "2.4.1" val circe = "0.13.0" val circeConfig = "0.8.0" // force circe 0.13.0 + val fs2PubSub = "0.22.0" + val catsRetry = "3.1.0" + // Scala (test only) val specs2 = "4.11.0" val specs2CE = "0.4.1" val testcontainers = "0.40.10" - val catsRetry = "2.1.0" + val catsRetryIT = "2.1.0" val http4sIT = "0.21.33" } @@ -103,13 +106,15 @@ object Dependencies { val decline = "com.monovore" %% "decline" % V.decline val circeGeneric = "io.circe" %% "circe-generic" % V.circe val circeConfig = "io.circe" %% "circe-config" % V.circeConfig + val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry + val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub // Scala (test only) val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest val specs2CEIt = "com.codecommit" %% "cats-effect-testing-specs2" % V.specs2CE % IntegrationTest val testcontainersIt = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest - val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetry % IntegrationTest + val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetryIT % IntegrationTest val http4sClientIt = "org.http4s" %% "http4s-blaze-client" % V.http4sIT % IntegrationTest val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % V.akkaHttp % Test diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSinkHttp4s.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSinkHttp4s.scala new file mode 100644 index 000000000..941536b9e --- /dev/null +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSinkHttp4s.scala @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.collectors.scalastream.sinks + +import cats.Parallel +import cats.effect.implicits.genSpawnOps +import cats.effect.{Async, Resource, Sync} +import cats.implicits._ +import com.google.api.gax.core.NoCredentialsProvider +import com.google.api.gax.grpc.GrpcTransportChannel +import com.google.api.gax.retrying.RetrySettings +import com.google.api.gax.rpc.{ApiException, FixedHeaderProvider, FixedTransportChannelProvider} +import com.google.cloud.pubsub.v1.Publisher +import com.permutive.pubsub.producer.Model.{ProjectId, Topic} +import com.permutive.pubsub.producer.PubsubProducer +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} +import com.snowplowanalytics.snowplow.collectors.scalastream.{Config, Sink, generated} +import io.grpc.ManagedChannelBuilder +import org.threeten.bp.Duration +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import retry.RetryPolicies +import retry.syntax.all._ + +import scala.concurrent.duration.{DurationLong, FiniteDuration} +import scala.util._ + +class GooglePubSubSinkHttp4s[F[_]: Async: Parallel: Logger] private ( + val maxBytes: Int, + producer: PubsubProducer[F, Array[Byte]], + retryInterval: FiniteDuration, + topicName: String +) extends Sink[F] { + + override def isHealthy: F[Boolean] = Sync[F].pure(true) //TODO + + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = + produce(events).start.void + + private def produce(events: List[Array[Byte]]): F[Unit] = + events.parTraverse_ { event => + producer + .produce(event) + .retryingOnAllErrors( + policy = RetryPolicies.constantDelay(retryInterval), + onError = (error, _) => Logger[F].error(createErrorMessage(error)) + ) + } + + private def createErrorMessage(error: Throwable): String = + error match { + case apiEx: ApiException => + val retryable = if (apiEx.isRetryable) "retryable" else "non-retryable" + s"Publishing message to $topicName failed with code ${apiEx.getStatusCode} and $retryable error: ${apiEx.getMessage}" + case t => s"Publishing message to $topicName failed with error: ${t.getMessage}" + } +} + +object GooglePubSubSinkHttp4s { + private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}" + + implicit private def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + implicit val byteArrayEncoder: MessageEncoder[Array[Byte]] = + new MessageEncoder[Array[Byte]] { + def encode(a: Array[Byte]): Either[Throwable, Array[Byte]] = + a.asRight + } + + def create[F[_]: Async: Parallel]( + maxBytes: Int, + sinkConfig: Config.Sink.PubSub, + bufferConfig: Config.Buffer, + topicName: String + ): Resource[F, GooglePubSubSinkHttp4s[F]] = { + val config = PubsubProducerConfig[F]( + batchSize = bufferConfig.recordLimit, + requestByteThreshold = Some(bufferConfig.byteLimit), + delayThreshold = bufferConfig.timeLimit.millis, + onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error"), + customizePublisher = Some(customizePublisher(sinkConfig)) + ) + + GooglePubsubProducer.of[F, Array[Byte]](ProjectId(sinkConfig.googleProjectId), Topic(topicName), config).map { + producer => + new GooglePubSubSinkHttp4s( + maxBytes, + producer, + sinkConfig.retryInterval, + topicName + ) + } + } + + private def customizePublisher(sinkConfig: Config.Sink.PubSub)(builder: Publisher.Builder) = { + val custom = builder + .setRetrySettings(retrySettings(sinkConfig.backoffPolicy)) + .setHeaderProvider(FixedHeaderProvider.create("User-Agent", UserAgent)) + createCustomProviders().foreach { + case (channelProvider, credentialsProvider) => + custom.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider) + } + custom + } + + private def createCustomProviders(): Option[(FixedTransportChannelProvider, NoCredentialsProvider)] = + sys.env.get("PUBSUB_EMULATOR_HOST").map { hostPort => + val channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build() + val channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)) + val credentialsProvider = NoCredentialsProvider.create() + (channelProvider, credentialsProvider) + } + + private def retrySettings(backoffPolicy: Config.Sink.PubSubBackoffPolicy): RetrySettings = + RetrySettings + .newBuilder() + .setInitialRetryDelay(Duration.ofMillis(backoffPolicy.minBackoff)) + .setMaxRetryDelay(Duration.ofMillis(backoffPolicy.maxBackoff)) + .setRetryDelayMultiplier(backoffPolicy.multiplier) + .setTotalTimeout(Duration.ofMillis(backoffPolicy.totalBackoff)) + .setInitialRpcTimeout(Duration.ofMillis(backoffPolicy.initialRpcTimeout)) + .setRpcTimeoutMultiplier(backoffPolicy.rpcTimeoutMultiplier) + .setMaxRpcTimeout(Duration.ofMillis(backoffPolicy.maxRpcTimeout)) + .build() + +}