Skip to content

Commit

Permalink
Fix effects messiness caused by unnecessarily nesting fuctions
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 19, 2023
1 parent 219268c commit ba49ea2
Showing 1 changed file with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.concurrent.ExecutionContextExecutorService
import scala.concurrent.duration.MILLISECONDS

import cats.effect.{Resource, Sync}
import cats.implicits._

import com.snowplowanalytics.client.nsq.NSQProducer
import com.snowplowanalytics.snowplow.collector.core.{Sink}
Expand All @@ -49,23 +50,20 @@ class NsqSink[F[_]: Sync] private (

var healthStatus = true

override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus)
override def isHealthy: F[Boolean] = Sync[F].pure(healthStatus)

private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start()

def produceData(topicName: String, events: List[Array[Byte]]): F[Unit] =
Sync[F].blocking(producer.produceMulti(topicName, events.asJava))
.onError { case _: NSQException | _: TimeoutException =>
Sync[F].delay(healthStatus = false)
} *> Sync[F].delay(healthStatus = true)

/**
* Store raw events to the topic
* @param events The list of events to send
* @param key The partition key (unused)
*/
override def storeRawEvents(events: List[Array[Byte]], key: String): F[Unit] =
Sync[F].delay(produceData(topicName, events))
Sync[F].blocking(producer.produceMulti(topicName, events.asJava)).onError {
case _: NSQException | _: TimeoutException =>
Sync[F].delay(healthStatus = false)
} *> Sync[F].delay(healthStatus = true)

def shutdown(): Unit =
producer.shutdown()
Expand Down

0 comments on commit ba49ea2

Please sign in to comment.