Skip to content

Commit

Permalink
Update nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.s…
Browse files Browse the repository at this point in the history
…calastream/sinks/NsqSink.scala

Co-authored-by: Benjamin BENOIST <[email protected]>
  • Loading branch information
colmsnowplow and benjben authored Sep 19, 2023
1 parent 130af1c commit 219268c
Showing 1 changed file with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,11 @@ class NsqSink[F[_]: Sync] private (

private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start()

def produceData(topicName: String, events: List[Array[Byte]]): Unit =
try {
producer.produceMulti(topicName, events.asJava)
} catch {
case e @ (_: NSQException | _: TimeoutException) => {
healthStatus = false
throw e
}
}
def produceData(topicName: String, events: List[Array[Byte]]): F[Unit] =
Sync[F].blocking(producer.produceMulti(topicName, events.asJava))
.onError { case _: NSQException | _: TimeoutException =>
Sync[F].delay(healthStatus = false)
} *> Sync[F].delay(healthStatus = true)

/**
* Store raw events to the topic
Expand Down

0 comments on commit 219268c

Please sign in to comment.