Skip to content

Commit

Permalink
Upgrade scala?
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Nov 3, 2023
1 parent 6d75e67 commit ee9e062
Show file tree
Hide file tree
Showing 18 changed files with 48 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
package com.snowplowanalytics.snowplow.collector.core

import java.nio.file.{Files, Path}

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.typesafe.config.{Config => TypesafeConfig, ConfigFactory}

import scala.collection.JavaConverters._

import com.typesafe.config.{ConfigFactory, Config => TypesafeConfig}
import io.circe.Decoder
import io.circe.config.syntax.CirceConfigOps

import cats.implicits._
import cats.data.EitherT

import cats.effect.{ExitCode, Sync}

import scala.jdk.CollectionConverters._

object ConfigParser {

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]
implicit private def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]

def fromPath[F[_]: Sync, SinkConfig: Decoder](
configPath: Option[Path]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import javax.net.ssl.SSLContext

object HttpServer {

implicit private def logger[F[_]: Async] = Slf4jLogger.getLogger[F]
implicit private def logger[F[_]: Async]: Logger[F] = Slf4jLogger.getLogger[F]

def build[F[_]: Async](
app: HttpApp[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Run {

type TelemetryInfo[F[_], SinkConfig] = Config.Streams[SinkConfig] => F[Telemetry.TelemetryInfo]

implicit private def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]
implicit private def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]

def fromCli[F[_]: Async: Tracking, SinkConfig: Decoder](
appInfo: AppInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.UUID
import org.apache.commons.codec.binary.Base64

import scala.concurrent.duration._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import cats.effect.{Clock, Sync}
import cats.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class RoutesSpec extends Specification {
case Method.GET =>
cookieParams.pixelExpected shouldEqual true
cookieParams.contentType shouldEqual None
case other =>
ko(s"Invalid http method - $other")
}
cookieParams.doNotTrack shouldEqual false
response.status must beEqualTo(Status.Ok)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.specs2.mutable.Specification

Expand Down Expand Up @@ -404,7 +404,7 @@ class ServiceSpec extends Specification {
`Access-Control-Allow-Headers`(ci"Content-Type", ci"SP-Anonymous"),
`Access-Control-Max-Age`.Cache(3600).asInstanceOf[`Access-Control-Max-Age`]
)
service.preflightResponse(Request[IO]()).unsafeRunSync.headers shouldEqual expected
service.preflightResponse(Request[IO]()).unsafeRunSync().headers shouldEqual expected
}
}

Expand Down Expand Up @@ -700,7 +700,7 @@ class ServiceSpec extends Specification {
)
cookie.secure must beTrue
cookie.httpOnly must beTrue
cookie.sameSite must beSome(SameSite.None)
cookie.sameSite must beSome[SameSite](SameSite.None)
cookie.extension must beNone
service.cookieHeader(
headers = Headers.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.Applicative

import org.http4s.SameSite

import com.snowplowanalytics.snowplow.collector.core.Config._
import com.snowplowanalytics.snowplow.collector.core.Config.{Sink => SinkConfig, _}

object TestUtils {
val appName = "collector-test"
Expand Down Expand Up @@ -75,7 +75,7 @@ object TestUtils {
),
cors = CORS(60.minutes),
streams = Streams(
good = Sink(
good = SinkConfig(
name = "raw",
Buffer(
3145728,
Expand All @@ -84,7 +84,7 @@ object TestUtils {
),
AnyRef
),
bad = Sink(
bad = SinkConfig(
name = "bad-1",
Buffer(
3145728,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect._
import org.apache.kafka.clients.consumer._
import java.util.Properties
import java.time.Duration
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.utils._
import com.snowplowanalytics.snowplow.collectors.scalastream.it.CollectorOutput

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class CookieSpec extends Specification with Localstack with CatsEffect {
cookie.name must beEqualTo("greatName")
cookie.expires match {
case Some(expiry) =>
expiry.epochSecond should beCloseTo((now + 42.days).toSeconds, 10)
expiry.epochSecond should beCloseTo((now + 42.days).toSeconds, 10L)
case None =>
ko(s"Cookie [$cookie] doesn't contain the expiry date")
}
cookie.secure should beTrue
cookie.httpOnly should beTrue
cookie.sameSite should beSome(SameSite.Strict)
cookie.sameSite should beSome[SameSite](SameSite.Strict)
case _ => ko(s"There is not 1 cookie but ${resp.cookies.size}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis.containe
import org.specs2.execute.PendingUntilFixed
import org.specs2.mutable.Specification

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.concurrent.duration._

class DoNotTrackCookieSpec extends Specification with Localstack with CatsEffect with PendingUntilFixed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.kinesis

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer

import cats.effect.{IO, Resource}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory
import java.nio.ByteBuffer
import java.util.UUID
import java.util.concurrent.ScheduledExecutorService
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutorService, Future}
Expand Down Expand Up @@ -92,7 +92,7 @@ class KinesisSink[F[_]: Sync] private (

def flush(): Unit = {
val eventsToSend = synchronized {
val evts = storedEvents.result
val evts = storedEvents.result()
storedEvents.clear()
byteCount = 0
evts
Expand Down Expand Up @@ -367,7 +367,7 @@ class KinesisSink[F[_]: Sync] private (

private def checkKinesisHealth(): Unit = {
val healthRunnable = new Runnable {
override def run() {
override def run(): Unit = {
log.info(s"Starting background check for Kinesis stream $streamName")
while (!kinesisHealthy) {
Try {
Expand All @@ -392,7 +392,7 @@ class KinesisSink[F[_]: Sync] private (

private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs =>
val healthRunnable = new Runnable {
override def run() {
override def run(): Unit = {
log.info(s"Starting background check for SQS buffer ${sqs.sqsBufferName}")
while (!sqsHealthy) {
Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.util.concurrent.TimeoutException
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import cats.effect.{Resource, Sync}
import cats.implicits._
import com.snowplowanalytics.client.nsq.NSQProducer
Expand Down Expand Up @@ -42,11 +42,15 @@ class NsqSink[F[_]: Sync] private (
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
Sync[F].blocking(producer.produceMulti(topicName, events.asJava)).onError {
case _: NSQException | _: TimeoutException =>
Sync[F].delay(healthStatus = false)
} *> Sync[F].delay(healthStatus = true)
setHealthStatus(false)
} *> setHealthStatus(true)

def shutdown(): Unit =
producer.shutdown()

private def setHealthStatus(status: Boolean): F[Unit] = Sync[F].delay {
healthStatus = status
}
}

object NsqSink {
Expand Down
24 changes: 11 additions & 13 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ import sbtdynver.DynVerPlugin.autoImport._
object BuildSettings {

lazy val commonSettings = Seq(
organization := "com.snowplowanalytics",
name := "snowplow-stream-collector",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.12.10",
scalacOptions ++= Seq("-Ypartial-unification", "-Ywarn-macros:after"),
javacOptions := Seq("-source", "11", "-target", "11"),
resolvers ++= resolutionRepos
)

lazy val resolutionRepos = Seq(
"Snowplow Analytics Maven repo".at("http://maven.snplow.com/releases/").withAllowInsecureProtocol(true),
// For uaParser utils
"user-agent-parser repo".at("https://clojars.org/repo/")
organization := "com.snowplowanalytics",
name := "snowplow-stream-collector",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.13.12",
scalacOptions ++= Seq("-Ywarn-macros:after"),
javacOptions := Seq("-source", "11", "-target", "11"),
resolvers ++= Seq(
"Snowplow Analytics Maven repo".at("http://maven.snplow.com/releases/").withAllowInsecureProtocol(true),
// For uaParser utils
"user-agent-parser repo".at("https://clojars.org/repo/")
)
)

lazy val coreHttp4sSettings = commonSettings ++ sbtAssemblySettings ++ Defaults.itSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.it.pubsub

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.BuilderOps._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util._

object PubSubHealthCheck {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object PubSubSink {
sinkConfig.name
)

private def createProducer[F[_]: Async: Parallel](
private def createProducer[F[_]: Async](
sinkConfig: PubSubSinkConfig,
topicName: String,
bufferConfig: Config.Buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Random, Success, Try}
import scala.concurrent.{ExecutionContextExecutorService, Future}
import scala.concurrent.duration.MILLISECONDS
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import cats.syntax.either._

Expand Down Expand Up @@ -82,7 +82,7 @@ class SqsSink[F[_]: Sync] private (

def flush(): Unit = {
val eventsToSend = synchronized {
val evts = storedEvents.result
val evts = storedEvents.result()
storedEvents.clear()
byteCount = 0
evts
Expand Down Expand Up @@ -242,7 +242,7 @@ class SqsSink[F[_]: Sync] private (

private def checkSqsHealth(): Unit = {
val healthRunnable = new Runnable {
override def run() {
override def run(): Unit =
while (!sqsHealthy) {
Try {
client.getQueueUrl(queueName)
Expand All @@ -255,7 +255,6 @@ class SqsSink[F[_]: Sync] private (
Thread.sleep(1000L)
}
}
}
}
executorService.execute(healthRunnable)
}
Expand Down

0 comments on commit ee9e062

Please sign in to comment.