From 219268c59d1bd4f4e0454dd36009c412249d193f Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 19 Sep 2023 16:28:44 +0200 Subject: [PATCH] Update nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala Co-authored-by: Benjamin BENOIST --- .../sinks/NsqSink.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index e893ead5e..ce5e3216d 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -53,15 +53,11 @@ class NsqSink[F[_]: Sync] private ( private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start() - def produceData(topicName: String, events: List[Array[Byte]]): Unit = - try { - producer.produceMulti(topicName, events.asJava) - } catch { - case e @ (_: NSQException | _: TimeoutException) => { - healthStatus = false - throw e - } - } +def produceData(topicName: String, events: List[Array[Byte]]): F[Unit] = + Sync[F].blocking(producer.produceMulti(topicName, events.asJava)) + .onError { case _: NSQException | _: TimeoutException => + Sync[F].delay(healthStatus = false) + } *> Sync[F].delay(healthStatus = true) /** * Store raw events to the topic