diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index e2c1c00a9..4a49e43c3 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -28,12 +28,13 @@ import scala.concurrent.duration.MILLISECONDS import scala.collection.JavaConverters._ import cats.effect.{Resource, Sync} -// import cats.implicits.catsSyntaxMonadErrorRethrow // import org.slf4j.LoggerFactory import com.snowplowanalytics.client.nsq.NSQProducer import com.snowplowanalytics.snowplow.collector.core.{Sink} +import com.snowplowanalytics.client.nsq.exceptions.NSQException +import java.util.concurrent.TimeoutException /** * NSQ Sink for the Scala Stream Collector @@ -52,17 +53,29 @@ class NsqSink[F[_]: Sync] private ( implicit lazy val ec: ExecutionContextExecutorService = concurrent.ExecutionContext.fromExecutorService(executorService) - override def isHealthy: F[Boolean] = Sync[F].pure(true) //TODO: Figure out this implementation + var healthStatus = true + + override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus) //TODO: Figure out this implementation private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start() + def produceData(topicName: String, events: List[Array[Byte]]): Unit = + try { + producer.produceMulti(topicName, events.asJava) + } catch { + case e @ (_: NSQException | _: TimeoutException) => { + healthStatus = false + throw e + } + } + /** * Store raw events to the topic * @param events The list of events to send * @param key The partition key (unused) */ override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] = - Sync[F].delay(producer.produceMulti(topicName, events.asJava)) + Sync[F].delay(produceData(topicName, events)) def shutdown(): Unit = producer.shutdown() @@ -83,7 +96,6 @@ object NsqSink { val acquire = Sync[F].delay( new NsqSink(maxBytes, nsqConfig, topicName, executorService) ) - //.rethrow val release = (sink: NsqSink[F]) => (Sync[F].delay(sink.shutdown()))