From ab01d159685e3be80e0f4a5586d8f3afdf680c17 Mon Sep 17 00:00:00 2001 From: eschastlivtsev <58555129+eschastlivtsev@users.noreply.github.com> Date: Thu, 3 Aug 2023 19:55:51 +0400 Subject: [PATCH] feat(uat): add connection state check to Paho Python client (#389) --- .../client-python-paho/scripts/build.bat | 29 ++++--- .../src/mqtt_lib/mqtt_connection.py | 83 ++++++++++++------- 2 files changed, 72 insertions(+), 40 deletions(-) diff --git a/uat/custom-components/client-python-paho/scripts/build.bat b/uat/custom-components/client-python-paho/scripts/build.bat index c27db7b1c..a2071ce94 100644 --- a/uat/custom-components/client-python-paho/scripts/build.bat +++ b/uat/custom-components/client-python-paho/scripts/build.bat @@ -6,15 +6,20 @@ @echo off set start_dir=%cd% cd %~dp0../src -pip install virtualenv -python -m venv dev-env -call dev-env\Scripts\activate.bat -pip install pyinstaller -pip install -r requirements.txt -python -m grpc_tools.protoc -I../../../proto --python_out=../src/grpc_client_server/grpc_generated --pyi_out=../src/grpc_client_server/grpc_generated --grpc_python_out=../src/grpc_client_server/grpc_generated ../../../proto/mqtt_client_control.proto -python fix_generated.py -pyinstaller client-python-paho.spec -move dist\client-python-paho.exe ..\ -rmdir /s /q dist -rmdir /s /q build -cd %start_dir% +pip install virtualenv || (cd %start_dir% & exit /b 1) +python -m venv dev-env || (cd %start_dir% & exit /b 2) +call dev-env\Scripts\activate.bat || (cd %start_dir% & exit /b 3) +pip install pyinstaller || (cd %start_dir% & deactivate & exit /b 4) +pip install -r requirements.txt || (cd %start_dir% & deactivate & exit /b 5) + +python -m grpc_tools.protoc -I../../../proto --python_out=../src/grpc_client_server/grpc_generated ^ +--pyi_out=../src/grpc_client_server/grpc_generated --grpc_python_out=../src/grpc_client_server/grpc_generated ^ +../../../proto/mqtt_client_control.proto || (cd %start_dir% & deactivate & exit /b 6) + +python fix_generated.py || (cd %start_dir% & deactivate & exit /b 7) +pyinstaller client-python-paho.spec || (cd %start_dir% & deactivate & rmdir /s /q dist & rmdir /s /q build & exit /b 8) +move dist\client-python-paho.exe ..\ || (cd %start_dir% & deactivate & rmdir /s /q dist & rmdir /s /q build & exit /b 9) +rmdir /s /q dist || (cd %start_dir% & deactivate & rmdir /s /q build & exit /b 10) +rmdir /s /q build || (cd %start_dir% & deactivate & exit /b 11) +cd %start_dir% || (deactivate & exit /b 12) +deactivate || exit /b 13 diff --git a/uat/custom-components/client-python-paho/src/mqtt_lib/mqtt_connection.py b/uat/custom-components/client-python-paho/src/mqtt_lib/mqtt_connection.py index d4cb29d34..63bd40997 100644 --- a/uat/custom-components/client-python-paho/src/mqtt_lib/mqtt_connection.py +++ b/uat/custom-components/client-python-paho/src/mqtt_lib/mqtt_connection.py @@ -232,6 +232,9 @@ def __init__( self.__asyncio_helper = None # pylint: disable=unused-private-member self.__loop = asyncio.get_running_loop() + self.__is_closing = False + self.__is_connected = False + def set_connection_id(self, connection_id: int): """ Connection id setter @@ -302,26 +305,26 @@ async def start(self, timeout: int) -> ConnectResult: result = await asyncio.wait_for(self.__on_connect_future, remaining_timeout) conn_ack_info = self.__convert_to_conn_ack(result) - connected = True if result.reason_code == mqtt.MQTT_ERR_SUCCESS: + self.__is_connected = True self.__logger.info( "MQTT connection ID %s connected, client id %s", self.__connection_id, self.__connection_params.client_id, ) else: - connected = False + self.__is_connected = False self.__logger.info( "MQTT connection ID %s failed with error: %s", self.__connection_id, result.error_string ) - return ConnectResult(connected=connected, conn_ack_info=conn_ack_info, error=result.error_string) + return ConnectResult(connected=self.__is_connected, conn_ack_info=conn_ack_info, error=result.error_string) except asyncio.TimeoutError: self.__logger.exception("Exception occurred during connect") - return ConnectResult(connected=False, conn_ack_info=None, error="Connect timeout error") + return ConnectResult(connected=self.__is_connected, conn_ack_info=None, error="Connect timeout error") except (PahoException, socket.timeout) as error: self.__logger.exception("Exception occurred during connect") - return ConnectResult(connected=False, conn_ack_info=None, error=str(error)) + return ConnectResult(connected=self.__is_connected, conn_ack_info=None, error=str(error)) except Exception as error: self.__logger.exception("Exception occurred during connect") raise MQTTException from error @@ -361,32 +364,41 @@ async def disconnect(self, timeout: int, reason: int, mqtt_properties: List[Mqtt mqtt_properties - list of Mqtt5Properties reason - disconnect reason code """ - self.__logger.info("Disconnect MQTT connection with reason code %i", reason) - self.__on_disconnect_future = self.__loop.create_future() - - try: - user_properties = self.__convert_to_user_properties(mqtt_properties, "DISCONNECT") - if self.__protocol == mqtt.MQTTv5: - reason_code_obj = ReasonCodes(PacketTypes.DISCONNECT, identifier=reason) - properties = Properties(PacketTypes.DISCONNECT) - properties.UserProperty = user_properties - self.__client.disconnect(reasoncode=reason_code_obj, properties=properties) - else: - self.__client.disconnect() + if not self.__is_closing: + self.__is_closing = True + self.__logger.info("Disconnect MQTT connection with reason code %i", reason) + self.__on_disconnect_future = self.__loop.create_future() - result = await asyncio.wait_for(self.__on_disconnect_future, timeout) - except asyncio.TimeoutError as error: - raise MQTTException("Couldn't disconnect from MQTT broker") from error - finally: - self.__on_disconnect_future = None - - if result.reason_code != mqtt.MQTT_ERR_SUCCESS: - raise MQTTException(f"Couldn't disconnect from MQTT broker - rc {result.reason_code}") + try: + if self.__is_connected is True: + self.__is_connected = False + user_properties = self.__convert_to_user_properties(mqtt_properties, "DISCONNECT") + if self.__protocol == mqtt.MQTTv5: + reason_code_obj = ReasonCodes(PacketTypes.DISCONNECT, identifier=reason) + properties = Properties(PacketTypes.DISCONNECT) + properties.UserProperty = user_properties + self.__client.disconnect(reasoncode=reason_code_obj, properties=properties) + else: + self.__client.disconnect() + + result = await asyncio.wait_for(self.__on_disconnect_future, timeout) + + if result.reason_code != mqtt.MQTT_ERR_SUCCESS: + raise MQTTException(f"Couldn't disconnect from MQTT broker - rc {result.reason_code}") + else: + self.__logger.warning("DISCONNECT was not sent on the dead connection") + + except asyncio.TimeoutError as error: + raise MQTTException("Couldn't disconnect from MQTT broker") from error + finally: + self.__on_disconnect_future = None def __on_disconnect(self, client, userdata, reason_code, properties=None): # pylint: disable=unused-argument """ Paho MQTT disconnect callback """ + self.__is_connected = False + mqtt_reason_code = reason_code if hasattr(reason_code, "value"): @@ -395,7 +407,11 @@ def __on_disconnect(self, client, userdata, reason_code, properties=None): # py mqtt_result = MqttResult(reason_code=mqtt_reason_code, properties=properties) disconnect_info = self.__convert_to_disconnect(mqtt_result=mqtt_result) - self.__grpc_client.on_mqtt_disconnect(self.__connection_id, disconnect_info, None) + + if self.__is_closing: + self.__logger.warning("DISCONNECT event ignored due to shutdown initiated") + else: + self.__grpc_client.on_mqtt_disconnect(self.__connection_id, disconnect_info, None) try: if self.__on_disconnect_future is not None: @@ -403,6 +419,10 @@ def __on_disconnect(self, client, userdata, reason_code, properties=None): # py except asyncio.InvalidStateError: pass + self.__logger.info( + "MQTT connectionId %i disconnected error '%s'", self.__connection_id, disconnect_info.reasonString + ) + async def publish(self, timeout: int, message: MqttPubMessage) -> MqttPublishReply: """ Publish MQTT message. @@ -564,7 +584,11 @@ def __on_message(self, client, userdata, message): # pylint: disable=unused-arg """ self.__logger.debug("onMessage message %s from topic '%s'", message.payload, message.topic) mqtt_message = self.__convert_to_mqtt_message(message=message) - self.__grpc_client.on_receive_mqtt_message(self.__connection_id, mqtt_message) + + if self.__is_closing: + self.__logger.info("PUBLISH event is ignored due to shutdown initiated") + else: + self.__grpc_client.on_receive_mqtt_message(self.__connection_id, mqtt_message) async def unsubscribe( self, timeout: int, filters: List[str], mqtt_properties: List[Mqtt5Properties] @@ -690,9 +714,12 @@ def __create_client(self, connection_params: ConnectionParams) -> mqtt.Client: def state_check(self): """Check if MQTT is connected""" - if (self.__client is None) or not self.__client.is_connected(): + if not self.__is_connected: raise MQTTException("MQTT client is not in connected state") + if self.__is_closing: + raise MQTTException("MQTT connection is closing") + def __create_publish_properties(self, message: MqttPubMessage): """ Create MQTT5 properties