Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Kafka/AVRO output bot. #1251

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions intelmq/bots/BOTS
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,15 @@
"replacement_char": "_"
}
},
"Kafka": {
"description": "Kafka is the bot responsible to send events to a kafka topic.",
"module": "intelmq.bots.outputs.kafka.output",
"parameters": {
"kafka_broker_list": "127.0.0.1",
"kafka_topic": "intelmq",
"flatten_fields": "extra"
}
},
"File": {
"description": "File is the bot responsible to send events to a file.",
"module": "intelmq.bots.outputs.file.output",
Expand Down
29 changes: 29 additions & 0 deletions intelmq/bots/outputs/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Kafka Output Bot

### Output Bot that sends events to kafka


Bot parameters:

Kafka Producer
* kafka_broker_list : comma seperated list of kafka brokers. defaults to 127.0.0.1

* kafka_topic : Index for the ElasticSearch output, defaults to intelmq

* flatten_fields : In ES, some query and aggregations work better if the fields are flat and not JSON.
Here you can provide a list of fields to convert. Can be a list of strings (fieldnames)
or a string with field names separated by a comma (,). eg `extra,field2` or `['extra', 'field2']`
Default: ['extra']

AVRO Producer
* avro_topic_schema : a file path, pointing to a file containing a dict object containing expected values, and their destination topics.
the keys in this dict should be all expected values in 'avro_topic_field'

IMPORTANT:schema must contain "other" keyword, with either None, or a topic name as the value.
If None is declared, other values are dropped.

* avro_topic_field : The field to be used to map intelligence identy to destination topic

* avro_value_schema_file: The schema file

* avro_schema_registry: URL where the schema registry is defined.
1 change: 1 addition & 0 deletions intelmq/bots/outputs/kafka/REQUIREMENTS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
confluent-kafka==0.11.4
Empty file.
126 changes: 126 additions & 0 deletions intelmq/bots/outputs/kafka/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-

from json import loads
from json import dumps
from json import load
from collections.abc import Mapping
from intelmq.lib.utils import load_configuration
from intelmq.lib.bot import Bot
try:
from confluent_kafka import Producer
except ImportError:
Producer = None
try:
from confluent_kafka import avro
except ImportError:
avro = None

if avro is not None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this and the two lines above be removed?

except ImportError:
    from confluent_kafka.avro import AvroProducer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just using this to throw better errors in init() for external dependencies. I can remove them if you'd like.

from confluent_kafka.avro import AvroProducer
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for a different library version or why is it necessary? Please document the reason inline.

And this line is not in a try-except block.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this again for better error handling as i was writing this. It can probably be removed as there's multiple python packages which fit within the kafka namespace. many of them have Producer class definitions, but no avro class definitions. confluent_kafka is a more specific namespace.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this again for better error handling as i was writing this.

But an ImportError is not catched here.



def replace_keys(obj, key_char='.', replacement='_'):
if isinstance(obj, Mapping):
replacement_obj = {}
for key, val in obj.items():
replacement_key = key.replace(key_char, replacement)
replacement_obj[replacement_key] = replace_keys(val, key_char, replacement)
return replacement_obj
return obj


class KafkaOutputBot(Bot):

def init(self):
if Producer is None:
raise ValueError('Missing confluent-kafka-python module.')
if avro is None:
raise ValueError('Missing python3-avro module')

self.broker_list = getattr(self.parameters,
'kafka_broker_list', '127.0.0.1:9092')
self.kafka_topic = getattr(self.parameters,
'kafka_topic', 'intelmq')
self.flatten_fields = getattr(self.parameters,
'flatten_fields', ['extra'])
self.enable_avro = getattr(self.parameters,
'enable_avro', False)

# Fields below are to define an output schema via AVRO
if self.enable_avro is True:

self.avro_value_schema = avro.loads(
dumps(load_configuration(self.parameters.avro_value_schema_file)))
self.avro_key_schema = avro.loads(
dumps(load_configuration(self.parameters.avro_key_schema_file)))

self.avro_topic_schema = load_configuration(self.parameters.avro_topic_schema)

self.avro_topic_field = getattr(
self.parameters, 'avro_topic_field', None)
self.avro_schema_registry = getattr(
self.parameters, 'avro_schema_registry', None)

# Build a list of producers for each destination topic
self.producer = AvroProducer({
'bootstrap.servers': self.broker_list,
'schema.registry.url': self.avro_schema_registry
},
default_key_schema=self.avro_key_schema,
default_value_schema=self.avro_value_schema
)
else:
self.producer = Producer({
'bootstrap.servers': self.broker_list
})

if isinstance(self.flatten_fields, str):
self.flatten_fields = self.flatten_fields.split(',')

def process(self):
event = self.receive_message()
event_dict = event.to_dict(hierarchical=False)

for field in self.flatten_fields:
if field in event_dict:
val = event_dict[field]
# if it's a string try to parse it as JSON
if isinstance(val, str):
try:
val = loads(val)
except ValueError:
pass
if isinstance(val, Mapping):
for key, value in val.items():
event_dict[field + '.' + key] = value
event_dict.pop(field)

event_dict = replace_keys(event_dict)

if self.enable_avro is False:
self.producer.produce(self.kafka_topic, dumps(event_dict).encode('utf-8'), callback=self.delivery_report)
self.acknowledge_message()
self.kafka.flush()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is self.kafka defined?

else:
try:
key_field = str(event_dict[self.avro_topic_field]).replace('.', '_')
except KeyError:
self.logger.debug('Event %s has no field %s, dropping.', format(event_dict), format(self.avro_topic_field))
self.acknowledge_message()
return

submit_key = {'indicator': event_dict[key_field]}
event_topic = self.avro_topic_schema[key_field]
self.logger.debug('Shipped %s to topic: %s', format(submit_key), format(event_topic))
self.producer.produce(topic=event_topic, value=event_dict, key=submit_key)
self.acknowledge_message()
self.producer.poll(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better do this check before acknowledging and raise an error if that happened so the message does not get lost.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like the AvroProducer has much in terms of error handling, looking at it now, i don't think that poll() does anything. I think i'm just going to have to catch serialization errors on produce.

Here's a link

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a message could not be successfully delivered, self.acknowledge() should not be called. For error handling (and logging), an Exception should be raised.


def delivery_report(self, err, msg):
""" Called once for each message produced to flag for failure.
Triggered by poll() or flush()."""
if err is not None:
self.logger.exception('Message delivery failed: {}'.format(err))


BOT = KafkaOutputBot