Skip to content

Commit

Permalink
Add separate good/bad sink configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Oct 24, 2023
1 parent d27f617 commit 081f466
Show file tree
Hide file tree
Showing 16 changed files with 529 additions and 106 deletions.
82 changes: 56 additions & 26 deletions examples/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -163,28 +163,17 @@ collector {
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good"

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad = "bad"

# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false

# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kafka

# Or Kafka
# Events which have successfully been collected will be stored in the good stream/topic
good {

name = "good"
brokers = "localhost:9092,another.host:9092"

## Number of retries to perform before giving up on sending a record
retries = 10
# The kafka producer has a variety of possible configuration options defined at
Expand All @@ -193,6 +182,7 @@ collector {
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit

#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
Expand All @@ -203,18 +193,58 @@ collector {
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000

# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad {

name = "bad"
brokers = "localhost:9092,another.host:9092"

## Number of retries to perform before giving up on sending a record
retries = 10
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit

#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000

# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
}

Expand Down
14 changes: 8 additions & 6 deletions examples/config.kafka.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ collector {
port = 8080

streams {
good = "good"
bad = "bad"

sink {
brokers = "localhost:9092,another.host:9092"
good {
name = "good"
brokers = "localhost:9092,another.host:9092"
}
bad {
name = "bad"
brokers = "localhost:9092,another.host:9092"
}
}
}
}
Loading

0 comments on commit 081f466

Please sign in to comment.