diff --git a/flake.nix b/flake.nix index 7258c456f..c39955ebc 100644 --- a/flake.nix +++ b/flake.nix @@ -35,7 +35,7 @@ jre metals sbt - # pkgs.nodePackages.snyk + pkgs.snyk pkgs.kubernetes-helm # (pkgs.wrapHelm pkgs.kubernetes-helm {plugins = [pkgs.kubernetes-helmPlugins.helm-diff];}) # pkgs.google-cloud-sdk.withExtraComponents( with pkgs.google-cloud-sdk.components [ gke-gcloud-auth-plugin ]); diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index a3023fd36..abcc9c865 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -40,7 +40,7 @@ class KafkaSink[F[_]: Sync]( * @param events The list of events to send * @param key The partition key to use */ - override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Sync[F].delay { + override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Sync[F].blocking { log.debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key") events.foreach { event => kafkaProducer.send( diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala index 83abb72a5..b894918b9 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/PrintingSink.scala @@ -20,7 +20,7 @@ class PrintingSink[F[_]: Sync]( override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = events.traverse_ { event => - Sync[F].delay { + Sync[F].blocking { stream.println(encoder.encodeToString(event)) } }