-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bug] subscribe channel does not receive message, even after a barrier among publisher and subscriber #4713
Comments
"Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?" You can use |
Thanks for your quick response! using |
Code using from zmq import PUB, REP, REQ, SUB, SUBSCRIBE, Context, LAST_ENDPOINT, XPUB, XPUB_VERBOSE # type: ignore
import torch
torch.distributed.init_process_group(backend="gloo")
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
context = Context()
if rank == 0:
local_socket = context.socket(XPUB)
local_socket.setsockopt(XPUB_VERBOSE, True)
# bind to a random port
local_socket.bind("tcp://*:*")
local_socket_port = local_socket.getsockopt(LAST_ENDPOINT).decode("utf-8").split(":")[-1]
torch.distributed.broadcast_object_list([local_socket_port], src=0)
# local readers
for i in range(world_size - 1):
local_socket.recv()
local_socket.send(b"READY")
local_socket.send(b"data")
else:
data = [None]
torch.distributed.broadcast_object_list(data, src=0)
local_socket_port, = data
local_socket = context.socket(SUB)
local_socket.connect(f"tcp://localhost:{local_socket_port}")
local_socket.setsockopt_string(SUBSCRIBE, "")
assert local_socket.recv() == b"READY"
assert local_socket.recv() == b"data" |
Just to confirm, @jamesdillonharvey is this issue a bug? I mean, when I use local sync sockets to make sure all subscribers joined, some subscribers are still not ready to receive the message. |
Please use this template for reporting suspected bugs or requests for help.
Issue description
Hi, team, thanks for the great project! I'm using zmq for broadcasting messages, in the vLLM project. And I encountered some bugs, that I think might be related with zmq.
Environment
Minimal test code / Steps to reproduce the issue
run the code for about 100 times:
About once in 20~50 runs, it will hang. The reason is, even if I put a barrier for the publisher and all subscriber, some subscribers still don't get the message. Therefore, they are waiting forever at
assert local_socket.recv() == b"READY"
I'm following https://zguide.zeromq.org/docs/chapter2/#Handling-Multiple-Sockets , to add a synchronization point before I publish anything. It works for most of the time. But sometimes it will fail, i.e. publish message before all subscriber are ready to subscribe the message.
I find adding a
time.sleep(1)
beforelocal_socket.send(b"READY")
helps, but that's not an elegant solution.Are there any methods to check, i.e. if the publisher gets enough subscribers, or the subscriber can check if it is connected to the publisher and is ready to receive the message?
Thanks for the great project, and look forward to the solution!
The text was updated successfully, but these errors were encountered: