From 21be18a3689a5d86517c53abc8acf0ee682a3ca8 Mon Sep 17 00:00:00 2001 From: Gerrit Beine Date: Sun, 7 Jul 2024 16:40:05 +0200 Subject: [PATCH 1/4] Introduce a warning timeout for MQTT topics The feature allows to define a parameter 'timeout' for each topic. The timeout is specified in seconds. Instead of notifying targets when a message arrives for a topic, mqttwarn will trigger the targets if no message is received on the topic for the specified period of time. --- mqttwarn/context.py | 6 +++++ mqttwarn/core.py | 31 +++++++++++++++++++++--- mqttwarn/topic.py | 58 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 mqttwarn/topic.py diff --git a/mqttwarn/context.py b/mqttwarn/context.py index cfed68e1..d3e58b24 100644 --- a/mqttwarn/context.py +++ b/mqttwarn/context.py @@ -51,6 +51,12 @@ def get_qos(self, section: str) -> int: qos = int(self.config.get(section, "qos")) return qos + def get_timeout(self, section: str) -> int: + timeout = -1 + if self.config.has_option(section, "timeout"): + timeout = int(self.config.get(section, "timeout")) + return timeout + def get_config(self, section: str, name: str) -> t.Any: value = None if self.config.has_option(section, name): diff --git a/mqttwarn/core.py b/mqttwarn/core.py index ccca83d2..9adebded 100644 --- a/mqttwarn/core.py +++ b/mqttwarn/core.py @@ -35,6 +35,7 @@ timeout, truncate, ) +from mqttwarn.topic import TopicTimeout try: import json @@ -76,6 +77,9 @@ # Instances of PeriodicThread objects ptlist: t.Dict[str, PeriodicThread] = {} +# Instances of TopicTimeout objects +topic_timeout_list: t.Dict[str, TopicTimeout] = {} + # Instances of loaded service plugins service_plugins: t.Dict[str, t.Dict[str, t.Any]] = dict() @@ -131,6 +135,7 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str, for section in context.get_sections(): topic = context.get_topic(section) qos = context.get_qos(section) + topic_timeout = context.get_timeout(section) if topic in subscribed: continue @@ -138,6 +143,10 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str, logger.debug("Subscribing to %s (qos=%d)" % (topic, qos)) mqttc.subscribe(topic, qos) subscribed.append(topic) + if topic_timeout > 0: + logger.debug("Setting up timeout thread for %s (timeout=%d)" % (topic, topic_timeout)) + topic_timeout_list[topic] = TopicTimeout(timeout=topic_timeout, topic=topic, on_timeout=message_to_targets_handler) + topic_timeout_list[topic].start() if cf.lwt is not None: mqttc.publish(cf.lwt, cf.lwt_alive, qos=0, retain=True) @@ -160,6 +169,9 @@ def on_disconnect(mosq: MqttClient, userdata: t.Dict[str, str], result_code: int """ Handle disconnections from the broker """ + for topic, thread in topic_timeout_list.items(): + thread.stop() + if result_code == 0: logger.info("Clean disconnection from broker") else: @@ -192,6 +204,22 @@ def on_message_handler(mosq: MqttClient, userdata: t.Dict[str, str], msg: MQTTMe logger.debug("Skipping retained message on %s" % topic) return + if topic in topic_timeout_list: + logger.debug("Message received, restarting timeout on %s" % topic) + topic_timeout_list[topic].restart() + return + + message_to_targets_handler(topic, payload) + + +# End of MQTT broker callbacks + + +def message_to_targets_handler(topic: str, payload: t.AnyStr): + """ + Identify targets for message and send the message to these targets + """ + # Try to find matching settings for this topic for section in context.get_sections(): # Get the topic for this section (usually the section name but optionally overridden) @@ -209,9 +237,6 @@ def on_message_handler(mosq: MqttClient, userdata: t.Dict[str, str], msg: MQTTMe send_to_targets(section, topic, payload) -# End of MQTT broker callbacks - - def send_failover(reason: str, message: t.AnyStr): # Make sure we dump this event to the log logger.warning(message) diff --git a/mqttwarn/topic.py b/mqttwarn/topic.py new file mode 100644 index 00000000..6199c7ae --- /dev/null +++ b/mqttwarn/topic.py @@ -0,0 +1,58 @@ +# (c) 2014-2023 The mqttwarn developers + +import logging +import threading +import time +import typing as t + +logger = logging.getLogger(__name__) + + +class TopicTimeout(threading.Thread): + """ + A thread handling timeouts on mqtt topics + """ + + def __init__( + self, + topic: t.Optional[str] = None, + timeout: t.Optional[int] = 1, + on_timeout: t.Optional[t.Callable] = None + ): + threading.Thread.__init__(self) + self.topic = topic + self._timeout = timeout + self._on_timeout = on_timeout + self._restart_event = threading.Event(); + self._stop_event = threading.Event() + + def run(self): + # The outer loop runs until the thread receives a stop signal + # See: https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread + # The outer loop is used to reset the timeout after a message was received + while not self._stop_event.is_set(): + timeout = self._timeout + # The inner loop runs until a stop signal is received or a message is received + # It uses the same logic as the outer loop for the signal handling + while True: + if self._stop_event.is_set(): + break + if self._restart_event.is_set(): + self._restart_event = threading.Event(); + break + time.sleep(1) + timeout = timeout - 1; + logger.debug("%s waiting... %i" % (self.name, timeout)) + if timeout == 0: + logger.info("%s Timeout!!!" % self.name) + message = "Timeout for topic %s after %i" % (self.topic, self._timeout) + self._on_timeout(self.topic, message.encode('UTF-8')) + break + + def restart(self): + logger.debug("Restarting timeout thread for %s (timeout %i)" % (self.topic, self._timeout)) + self._restart_event.set() + + def stop(self): + logger.debug("Stopping timeout thread for %s" % (self.topic)) + self._stop_event.set() From d593dd114bb6e7ec4872a393cfcfe15babcce3e2 Mon Sep 17 00:00:00 2001 From: Gerrit Beine Date: Sun, 7 Jul 2024 17:04:18 +0200 Subject: [PATCH 2/4] Fixed log levels and messages --- mqttwarn/topic.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/mqttwarn/topic.py b/mqttwarn/topic.py index 6199c7ae..34184926 100644 --- a/mqttwarn/topic.py +++ b/mqttwarn/topic.py @@ -21,17 +21,18 @@ def __init__( ): threading.Thread.__init__(self) self.topic = topic - self._timeout = timeout + self.timeout = timeout self._on_timeout = on_timeout self._restart_event = threading.Event(); self._stop_event = threading.Event() def run(self): + logger.debug("Starting thread %s for topic %s" % (self.name, self.topic)) # The outer loop runs until the thread receives a stop signal # See: https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread # The outer loop is used to reset the timeout after a message was received while not self._stop_event.is_set(): - timeout = self._timeout + timeout = self.timeout # The inner loop runs until a stop signal is received or a message is received # It uses the same logic as the outer loop for the signal handling while True: @@ -40,17 +41,17 @@ def run(self): if self._restart_event.is_set(): self._restart_event = threading.Event(); break + logger.debug("%s waiting... %i" % (self.name, timeout)) time.sleep(1) timeout = timeout - 1; - logger.debug("%s waiting... %i" % (self.name, timeout)) if timeout == 0: - logger.info("%s Timeout!!!" % self.name) - message = "Timeout for topic %s after %i" % (self.topic, self._timeout) + logger.debug("%s timeout for topic %s" % (self.name, self.topic)) + message = "Timeout for topic %s after %i" % (self.topic, self.timeout) self._on_timeout(self.topic, message.encode('UTF-8')) break def restart(self): - logger.debug("Restarting timeout thread for %s (timeout %i)" % (self.topic, self._timeout)) + logger.debug("Restarting timeout thread for %s (timeout %i)" % (self.topic, self.timeout)) self._restart_event.set() def stop(self): From a5969d40acf3c383d19da3159a4697d7881f82d7 Mon Sep 17 00:00:00 2001 From: Gerrit Beine Date: Mon, 8 Jul 2024 18:32:24 +0200 Subject: [PATCH 3/4] Minor improvements Reduced invasiveness - removed message_to_targets_handler by passing send_to_targets as on_timeout callback - introduced notify_only_on_timeout switch for allowing notifications for arriving messages, too --- mqttwarn/context.py | 6 ++++++ mqttwarn/core.py | 22 +++++++++------------- mqttwarn/topic.py | 8 ++++++-- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/mqttwarn/context.py b/mqttwarn/context.py index d3e58b24..33679906 100644 --- a/mqttwarn/context.py +++ b/mqttwarn/context.py @@ -57,6 +57,12 @@ def get_timeout(self, section: str) -> int: timeout = int(self.config.get(section, "timeout")) return timeout + def get_notify_only_on_timeout(self, section: str) -> int: + notify_only_on_timeout = False + if self.config.has_option(section, "notify_only_on_timeout"): + notify_only_on_timeout = bool(self.config.get(section, "notify_only_on_timeout")) + return notify_only_on_timeout + def get_config(self, section: str, name: str) -> t.Any: value = None if self.config.has_option(section, name): diff --git a/mqttwarn/core.py b/mqttwarn/core.py index 9adebded..abb344a2 100644 --- a/mqttwarn/core.py +++ b/mqttwarn/core.py @@ -136,6 +136,7 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str, topic = context.get_topic(section) qos = context.get_qos(section) topic_timeout = context.get_timeout(section) + notify_only_on_timeout = context.get_notify_only_on_timeout(section) if topic in subscribed: continue @@ -145,7 +146,9 @@ def on_connect(mosq: MqttClient, userdata: t.Dict[str, str], flags: t.Dict[str, subscribed.append(topic) if topic_timeout > 0: logger.debug("Setting up timeout thread for %s (timeout=%d)" % (topic, topic_timeout)) - topic_timeout_list[topic] = TopicTimeout(timeout=topic_timeout, topic=topic, on_timeout=message_to_targets_handler) + topic_timeout_list[topic] = TopicTimeout(timeout=topic_timeout, topic=topic, section=section, + notify_only_on_timeout=notify_only_on_timeout, + on_timeout=send_to_targets) topic_timeout_list[topic].start() if cf.lwt is not None: @@ -207,18 +210,8 @@ def on_message_handler(mosq: MqttClient, userdata: t.Dict[str, str], msg: MQTTMe if topic in topic_timeout_list: logger.debug("Message received, restarting timeout on %s" % topic) topic_timeout_list[topic].restart() - return - - message_to_targets_handler(topic, payload) - - -# End of MQTT broker callbacks - - -def message_to_targets_handler(topic: str, payload: t.AnyStr): - """ - Identify targets for message and send the message to these targets - """ + if topic_timeout_list[topic].notify_only_on_timeout: + return # Try to find matching settings for this topic for section in context.get_sections(): @@ -237,6 +230,9 @@ def message_to_targets_handler(topic: str, payload: t.AnyStr): send_to_targets(section, topic, payload) +# End of MQTT broker callbacks + + def send_failover(reason: str, message: t.AnyStr): # Make sure we dump this event to the log logger.warning(message) diff --git a/mqttwarn/topic.py b/mqttwarn/topic.py index 34184926..a3279751 100644 --- a/mqttwarn/topic.py +++ b/mqttwarn/topic.py @@ -17,11 +17,15 @@ def __init__( self, topic: t.Optional[str] = None, timeout: t.Optional[int] = 1, + section: t.Optional[str] = None, + notify_only_on_timeout: t.Optional[bool] = False, on_timeout: t.Optional[t.Callable] = None ): threading.Thread.__init__(self) self.topic = topic self.timeout = timeout + self.section = section + self.notify_only_on_timeout = notify_only_on_timeout self._on_timeout = on_timeout self._restart_event = threading.Event(); self._stop_event = threading.Event() @@ -43,11 +47,11 @@ def run(self): break logger.debug("%s waiting... %i" % (self.name, timeout)) time.sleep(1) - timeout = timeout - 1; + timeout = timeout - 1 if timeout == 0: logger.debug("%s timeout for topic %s" % (self.name, self.topic)) message = "Timeout for topic %s after %i" % (self.topic, self.timeout) - self._on_timeout(self.topic, message.encode('UTF-8')) + self._on_timeout(self.section, self.topic, message.encode('UTF-8')) break def restart(self): From 93acdd37dd5ba0060ff52870c9e1b6d614b105cf Mon Sep 17 00:00:00 2001 From: Gerrit Beine Date: Thu, 19 Sep 2024 21:47:29 +0200 Subject: [PATCH 4/4] Add support for timeouts on wildcard topics --- mqttwarn/core.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mqttwarn/core.py b/mqttwarn/core.py index abb344a2..d3c011b8 100644 --- a/mqttwarn/core.py +++ b/mqttwarn/core.py @@ -207,11 +207,12 @@ def on_message_handler(mosq: MqttClient, userdata: t.Dict[str, str], msg: MQTTMe logger.debug("Skipping retained message on %s" % topic) return - if topic in topic_timeout_list: - logger.debug("Message received, restarting timeout on %s" % topic) - topic_timeout_list[topic].restart() - if topic_timeout_list[topic].notify_only_on_timeout: - return + for match_topic in topic_timeout_list: + if paho.topic_matches_sub(match_topic, topic): + logger.debug("Message received, restarting timeout on %s" % match_topic) + topic_timeout_list[match_topic].restart() + if topic_timeout_list[match_topic].notify_only_on_timeout: + return # Try to find matching settings for this topic for section in context.get_sections():