diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index e893ead5e..ce5e3216d 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -53,15 +53,11 @@ class NsqSink[F[_]: Sync] private ( private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start() - def produceData(topicName: String, events: List[Array[Byte]]): Unit = - try { - producer.produceMulti(topicName, events.asJava) - } catch { - case e @ (_: NSQException | _: TimeoutException) => { - healthStatus = false - throw e - } - } +def produceData(topicName: String, events: List[Array[Byte]]): F[Unit] = + Sync[F].blocking(producer.produceMulti(topicName, events.asJava)) + .onError { case _: NSQException | _: TimeoutException => + Sync[F].delay(healthStatus = false) + } *> Sync[F].delay(healthStatus = true) /** * Store raw events to the topic