Skip to content

Commit

Permalink
Implement a jank sink healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 13, 2023
1 parent 7e37cbe commit d0403e8
Showing 1 changed file with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import scala.concurrent.duration.MILLISECONDS
import scala.collection.JavaConverters._

import cats.effect.{Resource, Sync}
// import cats.implicits.catsSyntaxMonadErrorRethrow

// import org.slf4j.LoggerFactory

import com.snowplowanalytics.client.nsq.NSQProducer
import com.snowplowanalytics.snowplow.collector.core.{Sink}
import com.snowplowanalytics.client.nsq.exceptions.NSQException
import java.util.concurrent.TimeoutException

/**
* NSQ Sink for the Scala Stream Collector
Expand All @@ -52,17 +53,29 @@ class NsqSink[F[_]: Sync] private (
implicit lazy val ec: ExecutionContextExecutorService =
concurrent.ExecutionContext.fromExecutorService(executorService)

override def isHealthy: F[Boolean] = Sync[F].pure(true) //TODO: Figure out this implementation
var healthStatus = true

override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus) //TODO: Figure out this implementation

private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start()

def produceData(topicName: String, events: List[Array[Byte]]): Unit =
try {
producer.produceMulti(topicName, events.asJava)
} catch {
case e @ (_: NSQException | _: TimeoutException) => {
healthStatus = false
throw e
}
}

/**
* Store raw events to the topic
* @param events The list of events to send
* @param key The partition key (unused)
*/
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
Sync[F].delay(producer.produceMulti(topicName, events.asJava))
Sync[F].delay(produceData(topicName, events))

def shutdown(): Unit =
producer.shutdown()
Expand All @@ -83,7 +96,6 @@ object NsqSink {
val acquire = Sync[F].delay(
new NsqSink(maxBytes, nsqConfig, topicName, executorService)
)
//.rethrow

val release = (sink: NsqSink[F]) => (Sync[F].delay(sink.shutdown()))

Expand Down

0 comments on commit d0403e8

Please sign in to comment.