Skip to content

Commit

Permalink
Nsq doesn't have a backoff policy implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 13, 2023
1 parent 5e8f7e3 commit 7e37cbe
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class NsqSink[F[_]: Sync] private (
implicit lazy val ec: ExecutionContextExecutorService =
concurrent.ExecutionContext.fromExecutorService(executorService)

override def isHealthy: F[Boolean] = Sync[F].pure(true)
override def isHealthy: F[Boolean] = Sync[F].pure(true) //TODO: Figure out this implementation

private val producer = new NSQProducer().addAddress(nsqConfig.host, nsqConfig.port).start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ import com.snowplowanalytics.snowplow.collector.core.Config

final case class NsqSinkConfig(
maxBytes: Int,
// backoffPolicy: NsqSinkConfig.BackoffPolicyConfig, // TODO: Figure out if this is used/needed/should be included as optional
threadPoolSize: Int,
host: String,
port: Int
) extends Config.Sink

object NsqSinkConfig {

final case class BackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int)

implicit val configDecoder: Decoder[NsqSinkConfig] = deriveDecoder[NsqSinkConfig]
implicit val backoffPolicyDecoder: Decoder[BackoffPolicyConfig] = deriveDecoder[BackoffPolicyConfig]
implicit val configDecoder: Decoder[NsqSinkConfig] = deriveDecoder[NsqSinkConfig]
}

0 comments on commit 7e37cbe

Please sign in to comment.