Skip to content

Commit

Permalink
Mark sinks as blocking where applicable
Browse files Browse the repository at this point in the history
TODO
  • Loading branch information
peel committed Mar 15, 2024
1 parent c64c7ae commit 23692eb
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
jre
metals
sbt
# pkgs.nodePackages.snyk
pkgs.snyk
pkgs.kubernetes-helm
# (pkgs.wrapHelm pkgs.kubernetes-helm {plugins = [pkgs.kubernetes-helmPlugins.helm-diff];})
# pkgs.google-cloud-sdk.withExtraComponents( with pkgs.google-cloud-sdk.components [ gke-gcloud-auth-plugin ]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KafkaSink[F[_]: Sync](
* @param events The list of events to send
* @param key The partition key to use
*/
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Sync[F].delay {
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = Sync[F].blocking {
log.debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key")
events.foreach { event =>
kafkaProducer.send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class PrintingSink[F[_]: Sync](

override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
events.traverse_ { event =>
Sync[F].delay {
Sync[F].blocking {
stream.println(encoder.encodeToString(event))
}
}
Expand Down

0 comments on commit 23692eb

Please sign in to comment.