-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_consumer.py
53 lines (40 loc) · 1.36 KB
/
kafka_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import logging
import logging.config
from configparser import ConfigParser
from confluent_kafka import Consumer
def run_kafka_consumer():
"""
Create Kafka consumer, subscribe to relevant topic and start consuming messages
"""
# load config
config = ConfigParser()
config.read("app.cfg")
# start kafka consumer
logger.info("Starting Kafka Consumer")
consumer = Consumer({
"bootstrap.servers": config.get("kafka", "bootstrap_servers"),
"group.id": config.get("kafka", "group_id"),
"auto.offset.reset": config.get("kafka", "auto_offset_reset")
})
# subscribe to topic
consumer.subscribe(topics=[config.get("kafka", "topic")])
# consume messages
try:
while True:
msg = consumer.poll(timeout=2.0)
if msg is None:
logging.debug("No message received")
continue
if msg.error():
logging.error(f"Consumer error: {msg.error()}")
continue
else:
logging.info(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
logging.info("Stopping Kafka consumer")
consumer.close()
if __name__ == "__main__":
# start logging
logging.config.fileConfig("logging.ini")
logger = logging.getLogger(__name__)
run_kafka_consumer()