Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a warning timeout for MQTT topics #697

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mqttwarn/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ def get_qos(self, section: str) -> int:
qos = int(self.config.get(section, "qos"))
return qos

def get_timeout(self, section: str) -> int:
timeout = -1
if self.config.has_option(section, "timeout"):
timeout = int(self.config.get(section, "timeout"))
return timeout

def get_notify_only_on_timeout(self, section: str) -> int:
notify_only_on_timeout = False
if self.config.has_option(section, "notify_only_on_timeout"):
notify_only_on_timeout = bool(self.config.get(section, "notify_only_on_timeout"))
return notify_only_on_timeout

def get_config(self, section: str, name: str) -> t.Any:
value = None
if self.config.has_option(section, name):
Expand Down
22 changes: 22 additions & 0 deletions mqttwarn/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
timeout,
truncate,
)
from mqttwarn.topic import TopicTimeout

try:
import json
Expand Down Expand Up @@ -76,6 +77,9 @@
# Instances of PeriodicThread objects
ptlist: t.Dict[str, PeriodicThread] = {}

# Instances of TopicTimeout objects
topic_timeout_list: t.Dict[str, TopicTimeout] = {}

# Instances of loaded service plugins
service_plugins: t.Dict[str, t.Dict[str, t.Any]] = dict()

Expand Down Expand Up @@ -131,13 +135,21 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str,
for section in context.get_sections():
topic = context.get_topic(section)
qos = context.get_qos(section)
topic_timeout = context.get_timeout(section)
notify_only_on_timeout = context.get_notify_only_on_timeout(section)

if topic in subscribed:
continue

logger.debug("Subscribing to %s (qos=%d)" % (topic, qos))
mqttc.subscribe(topic, qos)
subscribed.append(topic)
if topic_timeout > 0:
logger.debug("Setting up timeout thread for %s (timeout=%d)" % (topic, topic_timeout))
topic_timeout_list[topic] = TopicTimeout(timeout=topic_timeout, topic=topic, section=section,
notify_only_on_timeout=notify_only_on_timeout,
on_timeout=send_to_targets)
topic_timeout_list[topic].start()

if cf.lwt is not None:
mqttc.publish(cf.lwt, cf.lwt_alive, qos=0, retain=True)
Expand All @@ -160,6 +172,9 @@ def on_disconnect(mosq: MqttClient, userdata: t.Dict[str, str], result_code: int
"""
Handle disconnections from the broker
"""
for topic, thread in topic_timeout_list.items():
thread.stop()

if result_code == 0:
logger.info("Clean disconnection from broker")
else:
Expand Down Expand Up @@ -192,6 +207,13 @@ def on_message_handler(mosq: MqttClient, userdata: t.Dict[str, str], msg: MQTTMe
logger.debug("Skipping retained message on %s" % topic)
return

for match_topic in topic_timeout_list:
if paho.topic_matches_sub(match_topic, topic):
logger.debug("Message received, restarting timeout on %s" % match_topic)
topic_timeout_list[match_topic].restart()
if topic_timeout_list[match_topic].notify_only_on_timeout:
return

# Try to find matching settings for this topic
for section in context.get_sections():
# Get the topic for this section (usually the section name but optionally overridden)
Expand Down
63 changes: 63 additions & 0 deletions mqttwarn/topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# (c) 2014-2023 The mqttwarn developers

import logging
import threading
import time
import typing as t

logger = logging.getLogger(__name__)


class TopicTimeout(threading.Thread):
"""
A thread handling timeouts on mqtt topics
"""

def __init__(
self,
topic: t.Optional[str] = None,
timeout: t.Optional[int] = 1,
section: t.Optional[str] = None,
notify_only_on_timeout: t.Optional[bool] = False,
on_timeout: t.Optional[t.Callable] = None
):
threading.Thread.__init__(self)
self.topic = topic
self.timeout = timeout
self.section = section
self.notify_only_on_timeout = notify_only_on_timeout
self._on_timeout = on_timeout
self._restart_event = threading.Event();
self._stop_event = threading.Event()

def run(self):
logger.debug("Starting thread %s for topic %s" % (self.name, self.topic))
# The outer loop runs until the thread receives a stop signal
# See: https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread
# The outer loop is used to reset the timeout after a message was received
while not self._stop_event.is_set():
timeout = self.timeout
# The inner loop runs until a stop signal is received or a message is received
# It uses the same logic as the outer loop for the signal handling
while True:
if self._stop_event.is_set():
break
if self._restart_event.is_set():
self._restart_event = threading.Event();
break
logger.debug("%s waiting... %i" % (self.name, timeout))
time.sleep(1)
timeout = timeout - 1
if timeout == 0:
logger.debug("%s timeout for topic %s" % (self.name, self.topic))
message = "Timeout for topic %s after %i" % (self.topic, self.timeout)
self._on_timeout(self.section, self.topic, message.encode('UTF-8'))
break

def restart(self):
logger.debug("Restarting timeout thread for %s (timeout %i)" % (self.topic, self.timeout))
self._restart_event.set()

def stop(self):
logger.debug("Stopping timeout thread for %s" % (self.topic))
self._stop_event.set()
Loading