Skip to content

Commit

Permalink
Merge pull request #796 from eclipse/wait_for_publish_hang
Browse files Browse the repository at this point in the history
Fix wait_for_publish that could hang for QoS=0 message
  • Loading branch information
PierreF authored Jan 20, 2024
2 parents 80701e4 + 532ba6f commit 4207e5e
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 17 deletions.
3 changes: 2 additions & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ v2.0.0 - 2023-xx-xx
- Improve tests & linters. Modernize build (drop setup.py, use pyproject.toml)
- Fix is_connected property to correctly return False when connection is lost
and loop_start/loop_forever isn't used. Closes #525.

- Fix wait_for_publish that could hang with QoS == 0 message on reconnection
or publish during connection. Closes #549.


v1.6.1 - 2021-10-21
Expand Down
35 changes: 19 additions & 16 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ def timed_out() -> bool:
while not self._published and not timed_out():
self._condition.wait(timeout_tenth)

if self.rc > 0:
raise RuntimeError(f'Message publish failed: {error_string(self.rc)}')

def is_published(self) -> bool:
"""Returns True if the message associated with this object has been
published, else returns False."""
Expand Down Expand Up @@ -1193,17 +1196,24 @@ def reconnect(self) -> MQTTErrorCode:
"pos": 0,
}

self._out_packet = collections.deque()

with self._msgtime_mutex:
self._last_msg_in = time_func()
self._last_msg_out = time_func()

self._ping_t = 0.0
self._state = mqtt_cs_new

self._sock_close()

# Mark all currently outgoing QoS = 0 packets as lost,
# or `wait_for_publish()` could hang forever
for pkt in self._out_packet:
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0 and pkt["info"] is not None:
pkt["info"].rc = MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()

self._out_packet.clear()

with self._msgtime_mutex:
self._last_msg_in = time_func()
self._last_msg_out = time_func()

# Put messages in progress in a valid state.
self._messages_reconnect_reset()

Expand Down Expand Up @@ -1263,11 +1273,9 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode:
if timeout < 0.0:
raise ValueError('Invalid timeout.')

try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
if self.want_write():
wlist = [self._sock]
except IndexError:
else:
wlist = []

# used to check if there are any bytes left in the (SSL) socket
Expand Down Expand Up @@ -1751,12 +1759,7 @@ def want_write(self) -> bool:
"""Call to determine if there is network data waiting to be written.
Useful if you are calling select() yourself rather than using loop().
"""
try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
return True
except IndexError:
return False
return len(self._out_packet) > 0

def loop_misc(self) -> MQTTErrorCode:
"""Process miscellaneous network events. Use in place of calling loop() if you
Expand Down
55 changes: 55 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,61 @@ def on_disconnect(*args):
assert not mqttc.is_connected()


class TestPublish:
def test_publish_before_connect(self, fake_broker: FakeBroker) -> None:
mqttc = client.Client(
"test_publish_before_connect",
)

def on_connect(mqttc, obj, flags, rc):
assert rc == 0

mqttc.on_connect = on_connect

mqttc.loop_start()
mqttc.connect("localhost", fake_broker.port)
mqttc.enable_logger()

try:
mi = mqttc.publish("test", "testing")

fake_broker.start()

packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed
# re-call fake_broker.start() to take the 2nd connection done by client
# ... this is probably a bug, when using loop_start/loop_forever
# and doing a connect() before, the TCP connection is opened twice.
fake_broker.start()

connect_packet = paho_test.gen_connect(
"test_publish_before_connect", keepalive=60,
proto_ver=client.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

with pytest.raises(RuntimeError):
mi.wait_for_publish(1)

mqttc.disconnect()

disconnect_packet = paho_test.gen_disconnect()
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == disconnect_packet

finally:
mqttc.loop_stop()

packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed

class TestPublishBroker2Client:

def test_invalid_utf8_topic(self, fake_broker):
Expand Down

0 comments on commit 4207e5e

Please sign in to comment.