diff --git a/examples/config.kafka.extended.hocon b/examples/config.kafka.extended.hocon index 9d4a6e50c..7349deec5 100644 --- a/examples/config.kafka.extended.hocon +++ b/examples/config.kafka.extended.hocon @@ -163,28 +163,17 @@ collector { } streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "good" - - # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. - # The collector can currently produce two flavours of bad row: - # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; - # - a generic_error if a request's querystring cannot be parsed because of illegal characters - bad = "bad" # Whether to use the incoming event's ip as the partition key for the good stream/topic # Note: Nsq does not make use of partition key. useIpAddressAsPartitionKey = false - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = kafka - - # Or Kafka + # Events which have successfully been collected will be stored in the good stream/topic + good { + + name = "good" brokers = "localhost:9092,another.host:9092" + ## Number of retries to perform before giving up on sending a record retries = 10 # The kafka producer has a variety of possible configuration options defined at @@ -193,6 +182,7 @@ collector { # "bootstrap.servers" = brokers # "buffer.memory" = buffer.byteLimit # "linger.ms" = buffer.timeLimit + #producerConf { # acks = all # "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer" @@ -203,18 +193,58 @@ collector { # If a record is bigger, a size violation bad row is emitted instead # Default: 1 MB maxBytes = 1000000 + + # Incoming events are stored in a buffer before being sent to Kafka. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 + # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. + # The collector can currently produce two flavours of bad row: + # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; + # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad { + + name = "bad" + brokers = "localhost:9092,another.host:9092" + + ## Number of retries to perform before giving up on sending a record + retries = 10 + # The kafka producer has a variety of possible configuration options defined at + # https://kafka.apache.org/documentation/#producerconfigs + # Some values are set to other values from this config by default: + # "bootstrap.servers" = brokers + # "buffer.memory" = buffer.byteLimit + # "linger.ms" = buffer.timeLimit + + #producerConf { + # acks = all + # "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + # "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + #} + + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 1 MB + maxBytes = 1000000 + + # Incoming events are stored in a buffer before being sent to Kafka. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } } diff --git a/examples/config.kafka.minimal.hocon b/examples/config.kafka.minimal.hocon index 29ca6ff67..1547b5c1e 100644 --- a/examples/config.kafka.minimal.hocon +++ b/examples/config.kafka.minimal.hocon @@ -3,11 +3,13 @@ collector { port = 8080 streams { - good = "good" - bad = "bad" - - sink { - brokers = "localhost:9092,another.host:9092" + good { + name = "good" + brokers = "localhost:9092,another.host:9092" + } + bad { + name = "bad" + brokers = "localhost:9092,another.host:9092" } } -} +} \ No newline at end of file diff --git a/examples/legacy/config.kafka.extended.hocon b/examples/legacy/config.kafka.extended.hocon new file mode 100644 index 000000000..9d4a6e50c --- /dev/null +++ b/examples/legacy/config.kafka.extended.hocon @@ -0,0 +1,295 @@ +# Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved. +# +# This program is licensed to you under the Apache License Version 2.0, and +# you may not use this file except in compliance with the Apache License +# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +# http://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the Apache License Version 2.0 is distributed on an "AS +# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the Apache License Version 2.0 for the specific language +# governing permissions and limitations there under. + +# This file (config.hocon.sample) contains a template with +# configuration options for the Scala Stream Collector. +# +# To use, copy this to 'application.conf' and modify the configuration options. + +# 'collector' contains configuration options for the main Scala collector. +collector { + # The collector runs as a web service specified on the following interface and port. + interface = "0.0.0.0" + port = 8080 + + # optional SSL/TLS configuration + ssl { + enable = false + # whether to redirect HTTP to HTTPS + redirect = false + port = 443 + } + + # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol. + # The expected values are: + # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2 + # - r/tp2 for redirects + # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook + # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks + # downstream of the collector. + # But you can also map any valid (i.e. two-segment) path to one of the three defaults. + # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full + # valid paths starting with a leading slash. + # Pass in an empty map to avoid mapping. + paths { + # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" + # "/com.acme/redirect" = "/r/tp2" + # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" + } + + # Configure the P3P policy header. + p3p { + policyRef = "/w3c/p3p.xml" + CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" + } + + # Cross domain policy configuration. + # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml + # route. + crossDomain { + enabled = false + # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com + domains = [ "*" ] + # Whether to only grant access to HTTPS or both HTTPS and HTTP sources + secure = true + } + + # The collector returns a cookie to clients for user identification + # with the following domain and expiration. + cookie { + enabled = true + expiration = 365 days + # Network cookie name + name = sp + # The domain is optional and will make the cookie accessible to other + # applications on the domain. Comment out these lines to tie cookies to + # the collector's full domain. + # The domain is determined by matching the domains from the Origin header of the request + # to the list below. The first match is used. If no matches are found, the fallback domain will be used, + # if configured. + # If you specify a main domain, all subdomains on it will be matched. + # If you specify a subdomain, only that subdomain will be matched. + # Examples: + # domain.com will match domain.com, www.domain.com and secure.client.domain.com + # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com + #domains = [ + # "acme1.com" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned + # ... more domains + #] + # ... more domains + # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of + # cookie domains configured above. (For example, if there is no Origin header.) + #fallbackDomain = "acme1.com" + secure = true + httpOnly = true + # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`, + # `Lax` or `None` to limit the cookie sent context. + # Strict: the cookie will only be sent along with "same-site" requests. + # Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation. + # None: the cookie will be sent with same-site and cross-site requests. + sameSite = "None" + } + + # If you have a do not track cookie in place, the Scala Stream Collector can respect it by + # completely bypassing the processing of an incoming request carrying this cookie, the collector + # will simply reply by a 200 saying "do not track". + # The cookie name and value must match the configuration below, where the names of the cookies must + # match entirely and the value could be a regular expression. + doNotTrackCookie { + enabled = false + name = "" + value = "" + } + + # When enabled and the cookie specified above is missing, performs a redirect to itself to check + # if third-party cookies are blocked using the specified name. If they are indeed blocked, + # fallbackNetworkId is used instead of generating a new random one. + cookieBounce { + enabled = false + # The name of the request parameter which will be used on redirects checking that third-party + # cookies work. + name = "n3pc" + # Network user id to fallback to when third-party cookies are blocked. + fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000" + # Optionally, specify the name of the header containing the originating protocol for use in the + # bounce redirect location. Use this if behind a load balancer that performs SSL termination. + # The value of this header must be http or https. Example, if behind an AWS Classic ELB. + #forwardedProtocolHeader = "X-Forwarded-Proto" + } + + # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved. + # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found` + # Custom redirects configured in `paths` can still be used. + enableDefaultRedirect = false + + # Domains which are valid for collector redirects. If empty (the default) then redirects are + # allowed to any domain. + redirectDomains = [ + # "acme1.com" + ] + + # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder + # token. All instances of that token are replaced withe the network ID. If the placeholder isn't + # specified, the default value is `${SP_NUID}`. + redirectMacro { + enabled = false + } + + # Customize response handling for requests for the root path ("/"). + # Useful if you need to redirect to web content or privacy policies regarding the use of this collector. + rootResponse { + enabled = false + statusCode = 302 + headers = { + } + body = "" + } + + # Configuration related to CORS preflight requests + cors { + # The Access-Control-Max-Age response header indicates how long the results of a preflight + # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h. + accessControlMaxAge = 60 minutes + } + + streams { + # Events which have successfully been collected will be stored in the good stream/topic + good = "good" + + # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. + # The collector can currently produce two flavours of bad row: + # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; + # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad = "bad" + + # Whether to use the incoming event's ip as the partition key for the good stream/topic + # Note: Nsq does not make use of partition key. + useIpAddressAsPartitionKey = false + + # Enable the chosen sink by uncommenting the appropriate configuration + sink { + # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout. + # To use stdout, comment or remove everything in the "collector.streams.sink" section except + # "enabled" which should be set to "stdout". + enabled = kafka + + # Or Kafka + brokers = "localhost:9092,another.host:9092" + ## Number of retries to perform before giving up on sending a record + retries = 10 + # The kafka producer has a variety of possible configuration options defined at + # https://kafka.apache.org/documentation/#producerconfigs + # Some values are set to other values from this config by default: + # "bootstrap.servers" = brokers + # "buffer.memory" = buffer.byteLimit + # "linger.ms" = buffer.timeLimit + #producerConf { + # acks = all + # "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + # "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + #} + + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 1 MB + maxBytes = 1000000 + } + + # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. + # Note: Buffering is not supported by NSQ. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + } + + # Telemetry sends heartbeat events to external pipeline. + # Unless disable parameter set to true, this feature will be enabled. Deleting whole section will not disable it. + # Schema URI: iglu:com.snowplowanalytics.oss/oss_context/jsonschema/1-0-1 + # + telemetry { + disable = false + interval = 60 minutes + + # Connection properties for the receiving pipeline + method = POST + url = telemetry-g.snowplowanalytics.com + port = 443 + secure = true + } + # HTTP metrics reported by collector + monitoring.metrics.statsd { + enabled = false + # StatsD metric reporting protocol configuration + hostname = localhost + port = 8125 + # Required, how frequently to report metrics + period = "10 seconds" + # Optional, override the default metric prefix + # "prefix": "snowplow.collector" + } + + # Configures how long the colletor should pause after receiving a sigterm before starting the graceful shutdown. + # During this period the collector continues to accept new connections and respond to requests. + preTerminationPeriod = 10 seconds + + # During the preTerminationPeriod, the collector can be configured to return 503s on the /health endpoint + # Can be helpful for removing the collector from a load balancer's targets. + preTerminationUnhealthy = false + + # The akka server's deadline for closing connections during graceful shutdown + terminationDeadline = 10 seconds + + experimental { + # Enable an experimental feature to send some "warm-up" requests to the collector's own /health endpoint during startup. + # We have found from experiment this can cut down the number of 502s returned from a load balancer in front of the collector in Kubernetes deployments. + # More details in https://github.com/snowplow/stream-collector/issues/249 + warmup { + enable = false + numRequests = 2000 + maxConnections = 2000 + maxCycles = 3 + } + } +} + +# Akka has a variety of possible configuration options defined at +# http://doc.akka.io/docs/akka/current/scala/general/configuration.html +akka { + loglevel = WARNING # 'OFF' for no logging, 'DEBUG' for all logging. + loggers = ["akka.event.slf4j.Slf4jLogger"] + + # akka-http is the server the Stream collector uses and has configurable options defined at + # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html + http.server { + # To obtain the hostname in the collector, the 'remote-address' header + # should be set. By default, this is disabled, and enabling it + # adds the 'Remote-Address' header to every request automatically. + remote-address-header = on + + raw-request-uri-header = on + + # Define the maximum request length (the default is 2048) + parsing { + max-uri-length = 32768 + uri-parsing-mode = relaxed + } + + max-connections = 2048 + } +} diff --git a/examples/legacy/config.kafka.minimal.hocon b/examples/legacy/config.kafka.minimal.hocon new file mode 100644 index 000000000..26139c820 --- /dev/null +++ b/examples/legacy/config.kafka.minimal.hocon @@ -0,0 +1,13 @@ +collector { + interface = "0.0.0.0" + port = 8080 + + streams { + good = "good" + bad = "bad" + + sink { + brokers = "localhost:9092,another.host:9092" + } + } +} \ No newline at end of file diff --git a/http4s/src/main/resources/reference.conf b/http4s/src/main/resources/reference.conf index 929d36685..96dfd594f 100644 --- a/http4s/src/main/resources/reference.conf +++ b/http4s/src/main/resources/reference.conf @@ -51,12 +51,6 @@ streams { useIpAddressAsPartitionKey = false - - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 - } } telemetry { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala index 23b614458..22ee2e25f 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -12,7 +12,7 @@ import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking import com.snowplowanalytics.snowplow.collector.core.model.Sinks -abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo) +abstract class App[SinkConfig: Decoder](appInfo: AppInfo) extends CommandIOApp( name = App.helpCommand(appInfo), header = "Snowplow application that collects tracking events", diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index cf5bacdcb..86567becc 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -86,16 +86,12 @@ object Config { ) case class Streams[+SinkConfig]( - good: String, - bad: String, - useIpAddressAsPartitionKey: Boolean, - sink: SinkConfig, - buffer: Buffer + good: Sink[SinkConfig], + bad: Sink[SinkConfig], + useIpAddressAsPartitionKey: Boolean ) - trait Sink { - val maxBytes: Int - } + final case class Sink[+SinkConfig](name: String, buffer: Buffer, config: SinkConfig) case class Buffer( byteLimit: Long, @@ -166,15 +162,57 @@ object Config { implicit val redirectMacro = deriveDecoder[RedirectMacro] implicit val rootResponse = deriveDecoder[RootResponse] implicit val cors = deriveDecoder[CORS] - implicit val buffer = deriveDecoder[Buffer] - implicit val streams = deriveDecoder[Streams[SinkConfig]] implicit val statsd = deriveDecoder[Statsd] implicit val metrics = deriveDecoder[Metrics] implicit val monitoring = deriveDecoder[Monitoring] implicit val ssl = deriveDecoder[SSL] implicit val telemetry = deriveDecoder[Telemetry] implicit val networking = deriveDecoder[Networking] + implicit val sinkConfig = newDecoder[SinkConfig].or(legacyDecoder[SinkConfig]) + implicit val streams = deriveDecoder[Streams[SinkConfig]] + deriveDecoder[Config[SinkConfig]] } + implicit private val buffer: Decoder[Buffer] = deriveDecoder[Buffer] + + /** + * streams { + * good { + * name: "good-name" + * buffer {...} + * // rest of the sink config... + * } + * bad { + * name: "bad-name" + * buffer {...} + * // rest of the sink config... + * } + * } + */ + private def newDecoder[SinkConfig: Decoder]: Decoder[Sink[SinkConfig]] = + Decoder.instance { cursor => // cursor is at 'good'/'bad' section level + for { + sinkName <- cursor.get[String]("name") + config <- cursor.as[SinkConfig] + buffer <- cursor.get[Buffer]("buffer") + } yield Sink(sinkName, buffer, config) + } + + /** + * streams { + * good = "good-name" + * bad = "bad-name" + * buffer {...} //shared by good and bad + * sink {...} //shared by good and bad + * } + */ + private def legacyDecoder[SinkConfig: Decoder]: Decoder[Sink[SinkConfig]] = + Decoder.instance { cursor => //cursor is at 'good'/'bad' section level + for { + sinkName <- cursor.as[String] + config <- cursor.up.get[SinkConfig]("sink") //up first to the 'streams' section + buffer <- cursor.up.get[Buffer]("buffer") //up first to the 'streams' section + } yield Sink(sinkName, buffer, config) + } } diff --git a/http4s/src/test/resources/test-config.hocon b/http4s/src/test/resources/test-config.hocon index 71202d62f..8d2e06598 100644 --- a/http4s/src/test/resources/test-config.hocon +++ b/http4s/src/test/resources/test-config.hocon @@ -10,6 +10,12 @@ collector { foo = "hello" bar = "world" } + + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } ssl { diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala index 8106ab345..f4162a36e 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala @@ -1,13 +1,10 @@ package com.snowplowanalytics.snowplow.collector.core import java.nio.file.Paths - import org.specs2.mutable.Specification - import cats.effect.IO - import cats.effect.testing.specs2.CatsEffect - +import com.snowplowanalytics.snowplow.collector.core.Config.Buffer import io.circe.generic.semiauto._ class ConfigParserSpec extends Specification with CatsEffect { @@ -20,11 +17,25 @@ class ConfigParserSpec extends Specification with CatsEffect { val path = Paths.get(getClass.getResource(("/test-config.hocon")).toURI()) val expectedStreams = Config.Streams[SinkConfig]( - "good", - "bad", - TestUtils.testConfig.streams.useIpAddressAsPartitionKey, - SinkConfig("hello", "world"), - TestUtils.testConfig.streams.buffer + good = Config.Sink( + name = "good", + buffer = Buffer( + 3145728, + 500, + 5000 + ), + SinkConfig("hello", "world") + ), + bad = Config.Sink( + name = "bad", + buffer = Buffer( + 3145728, + 500, + 5000 + ), + SinkConfig("hello", "world") + ), + TestUtils.testConfig.streams.useIpAddressAsPartitionKey ) val expected = TestUtils .testConfig diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index 647871ee4..3647ec7d3 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -75,15 +75,25 @@ object TestUtils { ), cors = CORS(60.minutes), streams = Streams( - "raw", - "bad-1", - false, - AnyRef, - Buffer( - 3145728, - 500, - 5000 - ) + good = Sink( + name = "raw", + Buffer( + 3145728, + 500, + 5000 + ), + AnyRef + ), + bad = Sink( + name = "bad-1", + Buffer( + 3145728, + 500, + 5000 + ), + AnyRef + ), + useIpAddressAsPartitionKey = false ), monitoring = Monitoring( Metrics( diff --git a/kafka/src/it/resources/collector.hocon b/kafka/src/it/resources/collector.hocon index 78fd2c372..2468a977b 100644 --- a/kafka/src/it/resources/collector.hocon +++ b/kafka/src/it/resources/collector.hocon @@ -3,10 +3,13 @@ collector { port = ${PORT} streams { - good = ${TOPIC_GOOD} - bad = ${TOPIC_BAD} - - sink { + good { + name = ${TOPIC_GOOD} + brokers = ${BROKER} + maxBytes = ${MAX_BYTES} + } + bad { + name = ${TOPIC_BAD} brokers = ${BROKER} maxBytes = ${MAX_BYTES} } diff --git a/kafka/src/main/resources/application.conf b/kafka/src/main/resources/application.conf index 80182aeec..275fd19d1 100644 --- a/kafka/src/main/resources/application.conf +++ b/kafka/src/main/resources/application.conf @@ -1,12 +1,19 @@ collector { streams { + + //New object-like style + good = ${collector.streams.sink} + bad = ${collector.streams.sink} + + //Legacy style sink { - enabled = kafka threadPoolSize = 10 retries = 10 maxBytes = 1000000 + buffer = ${collector.streams.buffer} } + //Legacy style buffer { byteLimit = 3145728 recordLimit = 500 diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala index bd20c16e8..30db6f05c 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala @@ -23,18 +23,8 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] = for { - good <- KafkaSink.create[IO]( - config.sink.maxBytes, - config.good, - config.sink, - config.buffer - ) - bad <- KafkaSink.create[IO]( - config.sink.maxBytes, - config.bad, - config.sink, - config.buffer - ) + good <- KafkaSink.create[IO](config.good) + bad <- KafkaSink.create[IO](config.bad) } yield Sinks(good, bad) override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] = diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index 8c20858f1..88037674b 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -64,14 +64,11 @@ class KafkaSink[F[_]: Sync]( object KafkaSink { def create[F[_]: Sync]( - maxBytes: Int, - topicName: String, - kafkaConfig: KafkaSinkConfig, - bufferConfig: Config.Buffer + sinkConfig: Config.Sink[KafkaSinkConfig] ): Resource[F, KafkaSink[F]] = for { - kafkaProducer <- createProducer(kafkaConfig, bufferConfig) - kafkaSink = new KafkaSink(maxBytes, kafkaProducer, topicName) + kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer) + kafkaSink = new KafkaSink(sinkConfig.config.maxBytes, kafkaProducer, sinkConfig.name) } yield kafkaSink /** diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala index 676a5259d..ee4ede0cb 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala @@ -3,14 +3,12 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks import io.circe.Decoder import io.circe.generic.semiauto._ -import com.snowplowanalytics.snowplow.collector.core.Config - final case class KafkaSinkConfig( maxBytes: Int, brokers: String, retries: Int, producerConf: Option[Map[String, String]] -) extends Config.Sink +) object KafkaSinkConfig { implicit val configDecoder: Decoder[KafkaSinkConfig] = deriveDecoder[KafkaSinkConfig] diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 703fd1563..c775f36d1 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -43,6 +43,20 @@ class KafkaConfigSpec extends Specification with CatsEffect { expectedResult = Right(KafkaConfigSpec.expectedConfig) ) } + + //TODO it's just temporary to show that old config style is still working + "be able to parse extended kafka config- legacy " in { + assert( + resource = "/legacy/config.kafka.extended.hocon", + expectedResult = Right(KafkaConfigSpec.expectedConfig) + ) + } + "be able to parse minimal kafka config - legacy" in { + assert( + resource = "/legacy/config.kafka.minimal.hocon", + expectedResult = Right(KafkaConfigSpec.expectedConfig) + ) + } } private def assert(resource: String, expectedResult: Either[ExitCode, Config[KafkaSinkConfig]]) = { @@ -107,20 +121,35 @@ object KafkaConfigSpec { redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, streams = Config.Streams( - good = "good", - bad = "bad", - useIpAddressAsPartitionKey = false, - buffer = Config.Buffer( - byteLimit = 3145728, - recordLimit = 500, - timeLimit = 5000 + good = Config.Sink( + name = "good", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = KafkaSinkConfig( + maxBytes = 1000000, + brokers = "localhost:9092,another.host:9092", + retries = 10, + producerConf = None + ) ), - sink = KafkaSinkConfig( - maxBytes = 1000000, - brokers = "localhost:9092,another.host:9092", - retries = 10, - producerConf = None - ) + bad = Config.Sink( + name = "bad", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = KafkaSinkConfig( + maxBytes = 1000000, + brokers = "localhost:9092,another.host:9092", + retries = 10, + producerConf = None + ) + ), + useIpAddressAsPartitionKey = false ), telemetry = Config.Telemetry( disable = false,