Skip to content

Commit

Permalink
Init draft
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Aug 10, 2023
1 parent 7b35c99 commit c71eb38
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 20 deletions.
30 changes: 12 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -194,33 +194,27 @@ lazy val sqsDistroless = project
.dependsOn(core % "test->test;compile->compile")

lazy val pubsubSettings =
allSettings ++ buildInfoSettings ++ Defaults.itSettings ++ scalifiedSettings ++ Seq(
allSettings ++ buildInfoSettings ++ Seq(
moduleName := "snowplow-stream-collector-google-pubsub",
Docker / packageName := "scala-stream-collector-pubsub",
libraryDependencies ++= Seq(
Dependencies.Libraries.pubsub,
Dependencies.Libraries.protobuf,
// integration tests dependencies
Dependencies.Libraries.specs2It,
Dependencies.Libraries.specs2CEIt,
),
IntegrationTest / test := (IntegrationTest / test).dependsOn(Docker / publishLocal).value,
IntegrationTest / testOnly := (IntegrationTest / testOnly).dependsOn(Docker / publishLocal).evaluated
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.fs2PubSub
)
)

lazy val pubsub = project
.settings(pubsubSettings)
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile;it->it")
.configs(IntegrationTest)
.dependsOn(http4s % "test->test;compile->compile")

lazy val pubsubDistroless = project
.in(file("distroless/pubsub"))
.settings(sourceDirectory := (pubsub / sourceDirectory).value)
.settings(pubsubSettings)
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
.dependsOn(core % "test->test;compile->compile;it->it")
.configs(IntegrationTest)
//lazy val pubsubDistroless = project
// .in(file("distroless/pubsub"))
// .settings(sourceDirectory := (pubsub / sourceDirectory).value)
// .settings(pubsubSettings)
// .enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
// .dependsOn(core % "test->test;compile->compile;it->it")
// .configs(IntegrationTest)

lazy val kafkaSettings =
allSettings ++ buildInfoSettings ++ Seq(
Expand Down
9 changes: 7 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ object Dependencies {
val decline = "2.4.1"
val circe = "0.13.0"
val circeConfig = "0.8.0" // force circe 0.13.0
val fs2PubSub = "0.22.0"
val catsRetry = "3.1.0"

// Scala (test only)
val specs2 = "4.11.0"
val specs2CE = "0.4.1"
val testcontainers = "0.40.10"
val catsRetry = "2.1.0"
val catsRetryIT = "2.1.0"
val http4sIT = "0.21.33"
}

Expand Down Expand Up @@ -103,13 +106,15 @@ object Dependencies {
val decline = "com.monovore" %% "decline" % V.decline
val circeGeneric = "io.circe" %% "circe-generic" % V.circe
val circeConfig = "io.circe" %% "circe-config" % V.circeConfig
val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val specs2It = "org.specs2" %% "specs2-core" % V.specs2 % IntegrationTest
val specs2CEIt = "com.codecommit" %% "cats-effect-testing-specs2" % V.specs2CE % IntegrationTest
val testcontainersIt = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest
val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetry % IntegrationTest
val catsRetryIt = "com.github.cb372" %% "cats-retry" % V.catsRetryIT % IntegrationTest
val http4sClientIt = "org.http4s" %% "http4s-blaze-client" % V.http4sIT % IntegrationTest
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % V.akka % Test
val akkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % V.akkaHttp % Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import cats.Parallel
import cats.effect.implicits.genSpawnOps
import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.retrying.RetrySettings
import com.google.api.gax.rpc.{ApiException, FixedHeaderProvider, FixedTransportChannelProvider}
import com.google.cloud.pubsub.v1.Publisher
import com.permutive.pubsub.producer.Model.{ProjectId, Topic}
import com.permutive.pubsub.producer.PubsubProducer
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig}
import com.snowplowanalytics.snowplow.collectors.scalastream.{Config, Sink, generated}
import io.grpc.ManagedChannelBuilder
import org.threeten.bp.Duration
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry.RetryPolicies
import retry.syntax.all._

import scala.concurrent.duration.{DurationLong, FiniteDuration}
import scala.util._

class GooglePubSubSinkHttp4s[F[_]: Async: Parallel: Logger] private (
val maxBytes: Int,
producer: PubsubProducer[F, Array[Byte]],
retryInterval: FiniteDuration,
topicName: String
) extends Sink[F] {

override def isHealthy: F[Boolean] = Sync[F].pure(true) //TODO

override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
produce(events).start.void

private def produce(events: List[Array[Byte]]): F[Unit] =
events.parTraverse_ { event =>
producer
.produce(event)
.retryingOnAllErrors(
policy = RetryPolicies.constantDelay(retryInterval),
onError = (error, _) => Logger[F].error(createErrorMessage(error))
)
}

private def createErrorMessage(error: Throwable): String =
error match {
case apiEx: ApiException =>
val retryable = if (apiEx.isRetryable) "retryable" else "non-retryable"
s"Publishing message to $topicName failed with code ${apiEx.getStatusCode} and $retryable error: ${apiEx.getMessage}"
case t => s"Publishing message to $topicName failed with error: ${t.getMessage}"
}
}

object GooglePubSubSinkHttp4s {
private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}"

implicit private def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

implicit val byteArrayEncoder: MessageEncoder[Array[Byte]] =
new MessageEncoder[Array[Byte]] {
def encode(a: Array[Byte]): Either[Throwable, Array[Byte]] =
a.asRight
}

def create[F[_]: Async: Parallel](
maxBytes: Int,
sinkConfig: Config.Sink.PubSub,
bufferConfig: Config.Buffer,
topicName: String
): Resource[F, GooglePubSubSinkHttp4s[F]] = {
val config = PubsubProducerConfig[F](
batchSize = bufferConfig.recordLimit,
requestByteThreshold = Some(bufferConfig.byteLimit),
delayThreshold = bufferConfig.timeLimit.millis,
onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error"),
customizePublisher = Some(customizePublisher(sinkConfig))
)

GooglePubsubProducer.of[F, Array[Byte]](ProjectId(sinkConfig.googleProjectId), Topic(topicName), config).map {
producer =>
new GooglePubSubSinkHttp4s(
maxBytes,
producer,
sinkConfig.retryInterval,
topicName
)
}
}

private def customizePublisher(sinkConfig: Config.Sink.PubSub)(builder: Publisher.Builder) = {
val custom = builder
.setRetrySettings(retrySettings(sinkConfig.backoffPolicy))
.setHeaderProvider(FixedHeaderProvider.create("User-Agent", UserAgent))
createCustomProviders().foreach {
case (channelProvider, credentialsProvider) =>
custom.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider)
}
custom
}

private def createCustomProviders(): Option[(FixedTransportChannelProvider, NoCredentialsProvider)] =
sys.env.get("PUBSUB_EMULATOR_HOST").map { hostPort =>
val channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build()
val channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))
val credentialsProvider = NoCredentialsProvider.create()
(channelProvider, credentialsProvider)
}

private def retrySettings(backoffPolicy: Config.Sink.PubSubBackoffPolicy): RetrySettings =
RetrySettings
.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(backoffPolicy.minBackoff))
.setMaxRetryDelay(Duration.ofMillis(backoffPolicy.maxBackoff))
.setRetryDelayMultiplier(backoffPolicy.multiplier)
.setTotalTimeout(Duration.ofMillis(backoffPolicy.totalBackoff))
.setInitialRpcTimeout(Duration.ofMillis(backoffPolicy.initialRpcTimeout))
.setRpcTimeoutMultiplier(backoffPolicy.rpcTimeoutMultiplier)
.setMaxRpcTimeout(Duration.ofMillis(backoffPolicy.maxRpcTimeout))
.build()

}

0 comments on commit c71eb38

Please sign in to comment.