Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(uat): add connection state check to Paho Python client #389

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions uat/custom-components/client-python-paho/scripts/build.bat
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
@echo off
set start_dir=%cd%
cd %~dp0../src
pip install virtualenv
python -m venv dev-env
call dev-env\Scripts\activate.bat
pip install pyinstaller
pip install -r requirements.txt
python -m grpc_tools.protoc -I../../../proto --python_out=../src/grpc_client_server/grpc_generated --pyi_out=../src/grpc_client_server/grpc_generated --grpc_python_out=../src/grpc_client_server/grpc_generated ../../../proto/mqtt_client_control.proto
python fix_generated.py
pyinstaller client-python-paho.spec
move dist\client-python-paho.exe ..\
rmdir /s /q dist
rmdir /s /q build
cd %start_dir%
pip install virtualenv || (cd %start_dir% & exit /b 1)
python -m venv dev-env || (cd %start_dir% & exit /b 2)
call dev-env\Scripts\activate.bat || (cd %start_dir% & exit /b 3)
pip install pyinstaller || (cd %start_dir% & deactivate & exit /b 4)
pip install -r requirements.txt || (cd %start_dir% & deactivate & exit /b 5)

python -m grpc_tools.protoc -I../../../proto --python_out=../src/grpc_client_server/grpc_generated ^
--pyi_out=../src/grpc_client_server/grpc_generated --grpc_python_out=../src/grpc_client_server/grpc_generated ^
../../../proto/mqtt_client_control.proto || (cd %start_dir% & deactivate & exit /b 6)

python fix_generated.py || (cd %start_dir% & deactivate & exit /b 7)
pyinstaller client-python-paho.spec || (cd %start_dir% & deactivate & rmdir /s /q dist & rmdir /s /q build & exit /b 8)
move dist\client-python-paho.exe ..\ || (cd %start_dir% & deactivate & rmdir /s /q dist & rmdir /s /q build & exit /b 9)
rmdir /s /q dist || (cd %start_dir% & deactivate & rmdir /s /q build & exit /b 10)
rmdir /s /q build || (cd %start_dir% & deactivate & exit /b 11)
cd %start_dir% || (deactivate & exit /b 12)
deactivate || exit /b 13
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ def __init__(
self.__asyncio_helper = None # pylint: disable=unused-private-member
self.__loop = asyncio.get_running_loop()

self.__is_closing = False
self.__is_connected = False

def set_connection_id(self, connection_id: int):
"""
Connection id setter
Expand Down Expand Up @@ -302,26 +305,26 @@ async def start(self, timeout: int) -> ConnectResult:
result = await asyncio.wait_for(self.__on_connect_future, remaining_timeout)
conn_ack_info = self.__convert_to_conn_ack(result)

connected = True
if result.reason_code == mqtt.MQTT_ERR_SUCCESS:
self.__is_connected = True
self.__logger.info(
"MQTT connection ID %s connected, client id %s",
self.__connection_id,
self.__connection_params.client_id,
)
else:
connected = False
self.__is_connected = False
self.__logger.info(
"MQTT connection ID %s failed with error: %s", self.__connection_id, result.error_string
)

return ConnectResult(connected=connected, conn_ack_info=conn_ack_info, error=result.error_string)
return ConnectResult(connected=self.__is_connected, conn_ack_info=conn_ack_info, error=result.error_string)
except asyncio.TimeoutError:
self.__logger.exception("Exception occurred during connect")
return ConnectResult(connected=False, conn_ack_info=None, error="Connect timeout error")
return ConnectResult(connected=self.__is_connected, conn_ack_info=None, error="Connect timeout error")
except (PahoException, socket.timeout) as error:
self.__logger.exception("Exception occurred during connect")
return ConnectResult(connected=False, conn_ack_info=None, error=str(error))
return ConnectResult(connected=self.__is_connected, conn_ack_info=None, error=str(error))
except Exception as error:
self.__logger.exception("Exception occurred during connect")
raise MQTTException from error
Expand Down Expand Up @@ -361,32 +364,41 @@ async def disconnect(self, timeout: int, reason: int, mqtt_properties: List[Mqtt
mqtt_properties - list of Mqtt5Properties
reason - disconnect reason code
"""
self.__logger.info("Disconnect MQTT connection with reason code %i", reason)
self.__on_disconnect_future = self.__loop.create_future()

try:
user_properties = self.__convert_to_user_properties(mqtt_properties, "DISCONNECT")
if self.__protocol == mqtt.MQTTv5:
reason_code_obj = ReasonCodes(PacketTypes.DISCONNECT, identifier=reason)
properties = Properties(PacketTypes.DISCONNECT)
properties.UserProperty = user_properties
self.__client.disconnect(reasoncode=reason_code_obj, properties=properties)
else:
self.__client.disconnect()
if not self.__is_closing:
self.__is_closing = True
self.__logger.info("Disconnect MQTT connection with reason code %i", reason)
self.__on_disconnect_future = self.__loop.create_future()

result = await asyncio.wait_for(self.__on_disconnect_future, timeout)
except asyncio.TimeoutError as error:
raise MQTTException("Couldn't disconnect from MQTT broker") from error
finally:
self.__on_disconnect_future = None

if result.reason_code != mqtt.MQTT_ERR_SUCCESS:
raise MQTTException(f"Couldn't disconnect from MQTT broker - rc {result.reason_code}")
try:
if self.__is_connected is True:
self.__is_connected = False
user_properties = self.__convert_to_user_properties(mqtt_properties, "DISCONNECT")
if self.__protocol == mqtt.MQTTv5:
reason_code_obj = ReasonCodes(PacketTypes.DISCONNECT, identifier=reason)
properties = Properties(PacketTypes.DISCONNECT)
properties.UserProperty = user_properties
self.__client.disconnect(reasoncode=reason_code_obj, properties=properties)
else:
self.__client.disconnect()

result = await asyncio.wait_for(self.__on_disconnect_future, timeout)

if result.reason_code != mqtt.MQTT_ERR_SUCCESS:
raise MQTTException(f"Couldn't disconnect from MQTT broker - rc {result.reason_code}")
else:
self.__logger.warning("DISCONNECT was not sent on the dead connection")

except asyncio.TimeoutError as error:
raise MQTTException("Couldn't disconnect from MQTT broker") from error
finally:
self.__on_disconnect_future = None

def __on_disconnect(self, client, userdata, reason_code, properties=None): # pylint: disable=unused-argument
"""
Paho MQTT disconnect callback
"""
self.__is_connected = False

mqtt_reason_code = reason_code

if hasattr(reason_code, "value"):
Expand All @@ -395,14 +407,22 @@ def __on_disconnect(self, client, userdata, reason_code, properties=None): # py
mqtt_result = MqttResult(reason_code=mqtt_reason_code, properties=properties)

disconnect_info = self.__convert_to_disconnect(mqtt_result=mqtt_result)
self.__grpc_client.on_mqtt_disconnect(self.__connection_id, disconnect_info, None)

if self.__is_closing:
self.__logger.warning("DISCONNECT event ignored due to shutdown initiated")
else:
self.__grpc_client.on_mqtt_disconnect(self.__connection_id, disconnect_info, None)

try:
if self.__on_disconnect_future is not None:
self.__on_disconnect_future.set_result(mqtt_result)
except asyncio.InvalidStateError:
pass

self.__logger.info(
"MQTT connectionId %i disconnected error '%s'", self.__connection_id, disconnect_info.reasonString
)

async def publish(self, timeout: int, message: MqttPubMessage) -> MqttPublishReply:
"""
Publish MQTT message.
Expand Down Expand Up @@ -564,7 +584,11 @@ def __on_message(self, client, userdata, message): # pylint: disable=unused-arg
"""
self.__logger.debug("onMessage message %s from topic '%s'", message.payload, message.topic)
mqtt_message = self.__convert_to_mqtt_message(message=message)
self.__grpc_client.on_receive_mqtt_message(self.__connection_id, mqtt_message)

if self.__is_closing:
self.__logger.info("PUBLISH event is ignored due to shutdown initiated")
else:
self.__grpc_client.on_receive_mqtt_message(self.__connection_id, mqtt_message)

async def unsubscribe(
self, timeout: int, filters: List[str], mqtt_properties: List[Mqtt5Properties]
Expand Down Expand Up @@ -690,9 +714,12 @@ def __create_client(self, connection_params: ConnectionParams) -> mqtt.Client:

def state_check(self):
"""Check if MQTT is connected"""
if (self.__client is None) or not self.__client.is_connected():
if not self.__is_connected:
raise MQTTException("MQTT client is not in connected state")

if self.__is_closing:
raise MQTTException("MQTT connection is closing")

def __create_publish_properties(self, message: MqttPubMessage):
"""
Create MQTT5 properties
Expand Down
Loading