From 7cb0a14367da998dcb138f054bacb3e92fe23fbb Mon Sep 17 00:00:00 2001 From: Yuri Chernyavsky Date: Tue, 27 Jun 2023 04:07:41 -0400 Subject: [PATCH] Issue #153: Aio write_callback to handle EAGAIN --- dbus_next/aio/message_bus.py | 55 ++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/dbus_next/aio/message_bus.py b/dbus_next/aio/message_bus.py index 22a11c0..a6fd95b 100644 --- a/dbus_next/aio/message_bus.py +++ b/dbus_next/aio/message_bus.py @@ -15,6 +15,7 @@ import socket from copy import copy from typing import Optional +import errno def _future_set_exception(fut, exc): @@ -43,32 +44,38 @@ def __init__(self, bus): def write_callback(self): try: while True: - if self.buf is None: - if self.messages.qsize() == 0: - # nothing more to write - self.loop.remove_writer(self.fd) + try: + if self.buf is None: + if self.messages.qsize() == 0: + # nothing more to write + self.loop.remove_writer(self.fd) + return + buf, unix_fds, fut = self.messages.get_nowait() + self.unix_fds = unix_fds + self.buf = memoryview(buf) + self.offset = 0 + self.fut = fut + + if self.unix_fds and self.negotiate_unix_fd: + ancdata = [(socket.SOL_SOCKET, socket.SCM_RIGHTS, + array.array("i", self.unix_fds))] + self.offset += self.sock.sendmsg([self.buf[self.offset:]], ancdata) + self.unix_fds = None + else: + self.offset += self.sock.send(self.buf[self.offset:]) + + if self.offset >= len(self.buf): + # finished writing + self.buf = None + _future_set_result(self.fut, None) + else: + # wait for writable return - buf, unix_fds, fut = self.messages.get_nowait() - self.unix_fds = unix_fds - self.buf = memoryview(buf) - self.offset = 0 - self.fut = fut - - if self.unix_fds and self.negotiate_unix_fd: - ancdata = [(socket.SOL_SOCKET, socket.SCM_RIGHTS, - array.array("i", self.unix_fds))] - self.offset += self.sock.sendmsg([self.buf[self.offset:]], ancdata) - self.unix_fds = None - else: - self.offset += self.sock.send(self.buf[self.offset:]) + except OSError as e: + if e.errno == errno.EAGAIN: + return + raise - if self.offset >= len(self.buf): - # finished writing - self.buf = None - _future_set_result(self.fut, None) - else: - # wait for writable - return except Exception as e: _future_set_exception(self.fut, e) self.bus._finalize(e)