From 38b3cd47650f20f3acc9836f605f1cb46fedcaf8 Mon Sep 17 00:00:00 2001 From: Libba Lawrence Date: Thu, 8 Aug 2024 14:20:40 -0700 Subject: [PATCH] [EH] on event tasks bound receive (#36502) * update to limit prefetch * updates * pylint * use lock * refactor * pylint * make value a constant * add a mimic process test * test for buffer length * Update sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py * nit * dont use connstr * typo --- .../azure/eventhub/_constants.py | 1 + .../aio/_transport/_pyamqp_transport_async.py | 15 ++++++++-- .../asynctests/test_consumer_client_async.py | 29 +++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py index 877175ca7057..ea1859ba28c7 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py @@ -46,6 +46,7 @@ USER_AGENT_PREFIX = "azsdk-python-eventhubs" UAMQP_LIBRARY = "uamqp" PYAMQP_LIBRARY = "pyamqp" +MAX_BUFFER_LENGTH = 300 NO_RETRY_ERRORS = [ b"com.microsoft:argument-out-of-range", diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py index f001932e894f..106636640bff 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py @@ -22,6 +22,7 @@ EventDataSendError, OperationTimeoutError ) +from ..._constants import MAX_BUFFER_LENGTH if TYPE_CHECKING: @@ -253,7 +254,7 @@ async def _callback_task(consumer, batch, max_batch_size, max_wait_time): await asyncio.sleep(0.05) @staticmethod - async def _receive_task(consumer): + async def _receive_task(consumer, max_batch_size): # pylint:disable=protected-access max_retries = consumer._client._config.max_retries retried_times = 0 @@ -261,8 +262,16 @@ async def _receive_task(consumer): try: while retried_times <= max_retries and running and consumer._callback_task_run: try: + # set a default value of consumer._prefetch for buffer length + buff_length = MAX_BUFFER_LENGTH await consumer._open() # pylint: disable=protected-access - running = await cast(ReceiveClientAsync, consumer._handler).do_work_async(batch=consumer._prefetch) + async with consumer._message_buffer_lock: + buff_length = len(consumer._message_buffer) + if buff_length <= max_batch_size: + running = await cast(ReceiveClientAsync, consumer._handler).do_work_async( + batch=consumer._prefetch + ) + await asyncio.sleep(0.05) except asyncio.CancelledError: # pylint: disable=try-except-raise raise except Exception as exception: # pylint: disable=broad-except @@ -314,7 +323,7 @@ async def receive_messages_async(consumer, batch, max_batch_size, max_wait_time) callback_task = asyncio.create_task( PyamqpTransportAsync._callback_task(consumer, batch, max_batch_size, max_wait_time) ) - receive_task = asyncio.create_task(PyamqpTransportAsync._receive_task(consumer)) + receive_task = asyncio.create_task(PyamqpTransportAsync._receive_task(consumer, max_batch_size)) tasks = [callback_task, receive_task] try: diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py index fa823afed629..4447633e5435 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py @@ -460,3 +460,32 @@ async def on_event(partition_context, event): await asyncio.sleep(10) assert on_event.received == 1 await task + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_receive_mimic_processing_async(auth_credential_senders_async, uamqp_transport): + fully_qualified_namespace, eventhub_name, credential, senders = auth_credential_senders_async + for i in range(305): + senders[0].send(EventData("A")) + client = EventHubConsumerClient( + fully_qualified_namespace=fully_qualified_namespace, + eventhub_name=eventhub_name, + credential=credential(), + consumer_group="$default", + uamqp_transport=uamqp_transport + ) + async def on_event(partition_context, event): + assert partition_context.partition_id == "0" + assert partition_context.consumer_group == "$default" + assert partition_context.eventhub_name == senders[0]._client.eventhub_name + on_event.received += 1 + # Mimic processing of event + await asyncio.sleep(20) + assert client._event_processors[0]._consumers[0]._message_buffer <=300 + + on_event.received = 0 + async with client: + task = asyncio.ensure_future( + client.receive(on_event, partition_id="0", starting_position="-1", prefetch=2)) + await asyncio.sleep(10) + await task