Skip to content

Commit

Permalink
feat(uat): add connection state check to Paho Python client (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
eschastlivtsev authored Aug 3, 2023
1 parent 6f5bee9 commit ab01d15
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 40 deletions.
29 changes: 17 additions & 12 deletions uat/custom-components/client-python-paho/scripts/build.bat
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
@echo off
set start_dir=%cd%
cd %~dp0../src
pip install virtualenv
python -m venv dev-env
call dev-env\Scripts\activate.bat
pip install pyinstaller
pip install -r requirements.txt
python -m grpc_tools.protoc -I../../../proto --python_out=../src/grpc_client_server/grpc_generated --pyi_out=../src/grpc_client_server/grpc_generated --grpc_python_out=../src/grpc_client_server/grpc_generated ../../../proto/mqtt_client_control.proto
python fix_generated.py
pyinstaller client-python-paho.spec
move dist\client-python-paho.exe ..\
rmdir /s /q dist
rmdir /s /q build
cd %start_dir%
pip install virtualenv || (cd %start_dir% & exit /b 1)
python -m venv dev-env || (cd %start_dir% & exit /b 2)
call dev-env\Scripts\activate.bat || (cd %start_dir% & exit /b 3)
pip install pyinstaller || (cd %start_dir% & deactivate & exit /b 4)
pip install -r requirements.txt || (cd %start_dir% & deactivate & exit /b 5)

python -m grpc_tools.protoc -I../../../proto --python_out=../src/grpc_client_server/grpc_generated ^
--pyi_out=../src/grpc_client_server/grpc_generated --grpc_python_out=../src/grpc_client_server/grpc_generated ^
../../../proto/mqtt_client_control.proto || (cd %start_dir% & deactivate & exit /b 6)

python fix_generated.py || (cd %start_dir% & deactivate & exit /b 7)
pyinstaller client-python-paho.spec || (cd %start_dir% & deactivate & rmdir /s /q dist & rmdir /s /q build & exit /b 8)
move dist\client-python-paho.exe ..\ || (cd %start_dir% & deactivate & rmdir /s /q dist & rmdir /s /q build & exit /b 9)
rmdir /s /q dist || (cd %start_dir% & deactivate & rmdir /s /q build & exit /b 10)
rmdir /s /q build || (cd %start_dir% & deactivate & exit /b 11)
cd %start_dir% || (deactivate & exit /b 12)
deactivate || exit /b 13
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ def __init__(
self.__asyncio_helper = None # pylint: disable=unused-private-member
self.__loop = asyncio.get_running_loop()

self.__is_closing = False
self.__is_connected = False

def set_connection_id(self, connection_id: int):
"""
Connection id setter
Expand Down Expand Up @@ -302,26 +305,26 @@ async def start(self, timeout: int) -> ConnectResult:
result = await asyncio.wait_for(self.__on_connect_future, remaining_timeout)
conn_ack_info = self.__convert_to_conn_ack(result)

connected = True
if result.reason_code == mqtt.MQTT_ERR_SUCCESS:
self.__is_connected = True
self.__logger.info(
"MQTT connection ID %s connected, client id %s",
self.__connection_id,
self.__connection_params.client_id,
)
else:
connected = False
self.__is_connected = False
self.__logger.info(
"MQTT connection ID %s failed with error: %s", self.__connection_id, result.error_string
)

return ConnectResult(connected=connected, conn_ack_info=conn_ack_info, error=result.error_string)
return ConnectResult(connected=self.__is_connected, conn_ack_info=conn_ack_info, error=result.error_string)
except asyncio.TimeoutError:
self.__logger.exception("Exception occurred during connect")
return ConnectResult(connected=False, conn_ack_info=None, error="Connect timeout error")
return ConnectResult(connected=self.__is_connected, conn_ack_info=None, error="Connect timeout error")
except (PahoException, socket.timeout) as error:
self.__logger.exception("Exception occurred during connect")
return ConnectResult(connected=False, conn_ack_info=None, error=str(error))
return ConnectResult(connected=self.__is_connected, conn_ack_info=None, error=str(error))
except Exception as error:
self.__logger.exception("Exception occurred during connect")
raise MQTTException from error
Expand Down Expand Up @@ -361,32 +364,41 @@ async def disconnect(self, timeout: int, reason: int, mqtt_properties: List[Mqtt
mqtt_properties - list of Mqtt5Properties
reason - disconnect reason code
"""
self.__logger.info("Disconnect MQTT connection with reason code %i", reason)
self.__on_disconnect_future = self.__loop.create_future()

try:
user_properties = self.__convert_to_user_properties(mqtt_properties, "DISCONNECT")
if self.__protocol == mqtt.MQTTv5:
reason_code_obj = ReasonCodes(PacketTypes.DISCONNECT, identifier=reason)
properties = Properties(PacketTypes.DISCONNECT)
properties.UserProperty = user_properties
self.__client.disconnect(reasoncode=reason_code_obj, properties=properties)
else:
self.__client.disconnect()
if not self.__is_closing:
self.__is_closing = True
self.__logger.info("Disconnect MQTT connection with reason code %i", reason)
self.__on_disconnect_future = self.__loop.create_future()

result = await asyncio.wait_for(self.__on_disconnect_future, timeout)
except asyncio.TimeoutError as error:
raise MQTTException("Couldn't disconnect from MQTT broker") from error
finally:
self.__on_disconnect_future = None

if result.reason_code != mqtt.MQTT_ERR_SUCCESS:
raise MQTTException(f"Couldn't disconnect from MQTT broker - rc {result.reason_code}")
try:
if self.__is_connected is True:
self.__is_connected = False
user_properties = self.__convert_to_user_properties(mqtt_properties, "DISCONNECT")
if self.__protocol == mqtt.MQTTv5:
reason_code_obj = ReasonCodes(PacketTypes.DISCONNECT, identifier=reason)
properties = Properties(PacketTypes.DISCONNECT)
properties.UserProperty = user_properties
self.__client.disconnect(reasoncode=reason_code_obj, properties=properties)
else:
self.__client.disconnect()

result = await asyncio.wait_for(self.__on_disconnect_future, timeout)

if result.reason_code != mqtt.MQTT_ERR_SUCCESS:
raise MQTTException(f"Couldn't disconnect from MQTT broker - rc {result.reason_code}")
else:
self.__logger.warning("DISCONNECT was not sent on the dead connection")

except asyncio.TimeoutError as error:
raise MQTTException("Couldn't disconnect from MQTT broker") from error
finally:
self.__on_disconnect_future = None

def __on_disconnect(self, client, userdata, reason_code, properties=None): # pylint: disable=unused-argument
"""
Paho MQTT disconnect callback
"""
self.__is_connected = False

mqtt_reason_code = reason_code

if hasattr(reason_code, "value"):
Expand All @@ -395,14 +407,22 @@ def __on_disconnect(self, client, userdata, reason_code, properties=None): # py
mqtt_result = MqttResult(reason_code=mqtt_reason_code, properties=properties)

disconnect_info = self.__convert_to_disconnect(mqtt_result=mqtt_result)
self.__grpc_client.on_mqtt_disconnect(self.__connection_id, disconnect_info, None)

if self.__is_closing:
self.__logger.warning("DISCONNECT event ignored due to shutdown initiated")
else:
self.__grpc_client.on_mqtt_disconnect(self.__connection_id, disconnect_info, None)

try:
if self.__on_disconnect_future is not None:
self.__on_disconnect_future.set_result(mqtt_result)
except asyncio.InvalidStateError:
pass

self.__logger.info(
"MQTT connectionId %i disconnected error '%s'", self.__connection_id, disconnect_info.reasonString
)

async def publish(self, timeout: int, message: MqttPubMessage) -> MqttPublishReply:
"""
Publish MQTT message.
Expand Down Expand Up @@ -564,7 +584,11 @@ def __on_message(self, client, userdata, message): # pylint: disable=unused-arg
"""
self.__logger.debug("onMessage message %s from topic '%s'", message.payload, message.topic)
mqtt_message = self.__convert_to_mqtt_message(message=message)
self.__grpc_client.on_receive_mqtt_message(self.__connection_id, mqtt_message)

if self.__is_closing:
self.__logger.info("PUBLISH event is ignored due to shutdown initiated")
else:
self.__grpc_client.on_receive_mqtt_message(self.__connection_id, mqtt_message)

async def unsubscribe(
self, timeout: int, filters: List[str], mqtt_properties: List[Mqtt5Properties]
Expand Down Expand Up @@ -690,9 +714,12 @@ def __create_client(self, connection_params: ConnectionParams) -> mqtt.Client:

def state_check(self):
"""Check if MQTT is connected"""
if (self.__client is None) or not self.__client.is_connected():
if not self.__is_connected:
raise MQTTException("MQTT client is not in connected state")

if self.__is_closing:
raise MQTTException("MQTT connection is closing")

def __create_publish_properties(self, message: MqttPubMessage):
"""
Create MQTT5 properties
Expand Down

0 comments on commit ab01d15

Please sign in to comment.