Skip to content

Commit

Permalink
Add more connection state
Browse files Browse the repository at this point in the history
It's now possible to known when client is connecting/connected
and disconnecting/disconnected.
The difference between the two state is mostly whether the connection
is opened or not.
This will allow to avoid a reconnection while a reconnection is ongoing.
  • Loading branch information
PierreF committed Jan 20, 2024
1 parent 78dc877 commit a0b311b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 28 deletions.
47 changes: 25 additions & 22 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,6 @@ class _OutPacket(TypedDict):
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = ConnackCode.CONNACK_REFUSED_BAD_USERNAME_PASSWORD
CONNACK_REFUSED_NOT_AUTHORIZED = ConnackCode.CONNACK_REFUSED_NOT_AUTHORIZED

# Connection state
mqtt_cs_new = ConnectionState.MQTT_CS_NEW
mqtt_cs_connected = ConnectionState.MQTT_CS_CONNECTED
mqtt_cs_disconnecting = ConnectionState.MQTT_CS_DISCONNECTING
mqtt_cs_connect_async = ConnectionState.MQTT_CS_CONNECT_ASYNC

# Message state
mqtt_ms_invalid = MessageState.MQTT_MS_INVALID
mqtt_ms_publish = MessageState.MQTT_MS_PUBLISH
Expand Down Expand Up @@ -685,7 +679,7 @@ def __init__(
self._reconnect_on_failure = reconnect_on_failure
self._ping_t = 0.0
self._last_mid = 0
self._state = mqtt_cs_new
self._state = ConnectionState.MQTT_CS_NEW
self._out_messages: collections.OrderedDict[
int, MQTTMessage
] = collections.OrderedDict()
Expand Down Expand Up @@ -1326,7 +1320,7 @@ def connect_async(
self._bind_port = bind_port
self._clean_start = clean_start
self._connect_properties = properties
self._state = mqtt_cs_connect_async
self._state = ConnectionState.MQTT_CS_CONNECT_ASYNC

def reconnect_delay_set(self, min_delay: int = 1, max_delay: int = 120) -> None:
""" Configure the exponential reconnect delay
Expand Down Expand Up @@ -1367,7 +1361,7 @@ def reconnect(self) -> MQTTErrorCode:
self._last_msg_out = time_func()

self._ping_t = 0.0
self._state = mqtt_cs_new
self._state = ConnectionState.MQTT_CS_CONNECTING

self._sock_close()

Expand Down Expand Up @@ -1462,13 +1456,13 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode:
# call _loop(). We still want to break that loop by returning an
# rc != MQTT_ERR_SUCCESS and we don't want state to change from
# mqtt_cs_disconnecting.
if self._state != ConnectionState.MQTT_CS_DISCONNECTING:
if self._state not in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED):
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except ValueError:
# Can occur if we just reconnected but rlist/wlist contain a -1 for
# some reason.
if self._state != ConnectionState.MQTT_CS_DISCONNECTING:
if self._state not in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED):
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except Exception:
Expand Down Expand Up @@ -1655,7 +1649,7 @@ def is_connected(self) -> bool:
True if connection exists
False if connection is closed
"""
return self._state == mqtt_cs_connected
return self._state == ConnectionState.MQTT_CS_CONNECTED

def disconnect(
self,
Expand All @@ -1669,10 +1663,11 @@ def disconnect(
properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties
to be included. Optional - if not set, no properties are sent.
"""
self._state = mqtt_cs_disconnecting

if self._sock is None:
self._state = ConnectionState.MQTT_CS_DISCONNECTED
return MQTT_ERR_NO_CONN
else:
self._state = ConnectionState.MQTT_CS_DISCONNECTING

return self._send_disconnect(reasoncode, properties)

Expand Down Expand Up @@ -1941,7 +1936,8 @@ def loop_misc(self) -> MQTTErrorCode:
# This hasn't happened in the keepalive time so we should disconnect.
self._sock_close()

if self._state == mqtt_cs_disconnecting:
if self._state in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED):
self._state = ConnectionState.MQTT_CS_DISCONNECTED
rc = MQTTErrorCode.MQTT_ERR_SUCCESS
else:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
Expand Down Expand Up @@ -2066,7 +2062,7 @@ def loop_forever(
if self._thread_terminate is True:
break

if self._state == mqtt_cs_connect_async:
if self._state == ConnectionState.MQTT_CS_CONNECT_ASYNC:
try:
self.reconnect()
except (OSError, WebsocketConnectionError):
Expand Down Expand Up @@ -2095,7 +2091,7 @@ def loop_forever(

def should_exit() -> bool:
return (
self._state == mqtt_cs_disconnecting or
self._state in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED) or
run is False or # noqa: B023 (uses the run variable from the outer scope on purpose)
self._thread_terminate is True
)
Expand Down Expand Up @@ -2748,7 +2744,8 @@ def _loop_rc_handle(
if rc:
self._sock_close()

if self._state == mqtt_cs_disconnecting:
if self._state in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED):
self._state = ConnectionState.MQTT_CS_DISCONNECTED
rc = MQTTErrorCode.MQTT_ERR_SUCCESS

self._do_on_disconnect(rc, properties)
Expand Down Expand Up @@ -2918,6 +2915,11 @@ def _packet_write(self) -> MQTTErrorCode:

self._do_on_disconnect(MQTTErrorCode.MQTT_ERR_SUCCESS)
self._sock_close()
# Only change to disconnected if the disconnection was wanted
# by the client (== state was disconnecting). If the broker disconnected
# use unilaterally don't change the state and client may reconnect.
if self._state == ConnectionState.MQTT_CS_DISCONNECTING:
self._state = ConnectionState.MQTT_CS_DISCONNECTED
return MQTTErrorCode.MQTT_ERR_SUCCESS

else:
Expand Down Expand Up @@ -2954,7 +2956,7 @@ def _check_keepalive(self) -> None:
last_msg_in = self._last_msg_in

if self._sock is not None and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
if self._state == mqtt_cs_connected and self._ping_t == 0:
if self._state == ConnectionState.MQTT_CS_CONNECTED and self._ping_t == 0:
try:
self._send_pingreq()
except Exception:
Expand All @@ -2967,7 +2969,8 @@ def _check_keepalive(self) -> None:
else:
self._sock_close()

if self._state == mqtt_cs_disconnecting:
if self._state in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED):
self._state = ConnectionState.MQTT_CS_DISCONNECTED
rc = MQTTErrorCode.MQTT_ERR_SUCCESS
else:
rc = MQTTErrorCode.MQTT_ERR_KEEPALIVE
Expand Down Expand Up @@ -3565,7 +3568,7 @@ def _handle_connack(self) -> MQTTErrorCode:
return self.reconnect()

if result == 0:
self._state = mqtt_cs_connected
self._state = ConnectionState.MQTT_CS_CONNECTED
self._reconnect_delay = None

if self._protocol == MQTTv5:
Expand Down Expand Up @@ -4089,7 +4092,7 @@ def _reconnect_wait(self) -> None:
target_time = now + self._reconnect_delay

remaining = target_time - now
while (self._state != mqtt_cs_disconnecting
while (self._state not in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED)
and not self._thread_terminate
and remaining > 0):

Expand Down
14 changes: 8 additions & 6 deletions src/paho/mqtt/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ class ConnackCode(enum.IntEnum):
CONNACK_REFUSED_NOT_AUTHORIZED = 5


class ConnectionState(enum.IntEnum):
MQTT_CS_NEW = 0
MQTT_CS_CONNECTED = 1
MQTT_CS_DISCONNECTING = 2
MQTT_CS_CONNECT_ASYNC = 3
MQTT_CS_CONNECTION_LOST = 4
class ConnectionState(enum.Enum):
MQTT_CS_NEW = enum.auto()
MQTT_CS_CONNECT_ASYNC = enum.auto()
MQTT_CS_CONNECTING = enum.auto()
MQTT_CS_CONNECTED = enum.auto()
MQTT_CS_CONNECTION_LOST = enum.auto()
MQTT_CS_DISCONNECTING = enum.auto()
MQTT_CS_DISCONNECTED = enum.auto()


class MessageState(enum.IntEnum):
Expand Down

0 comments on commit a0b311b

Please sign in to comment.