Skip to content

Commit

Permalink
[EH] on event tasks bound receive (Azure#36502)
Browse files Browse the repository at this point in the history
* update to limit prefetch

* updates

* pylint

* use lock

* refactor

* pylint

* make value a constant

* add a mimic process test

* test for buffer length

* Update sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py

* nit

* dont use connstr

* typo
  • Loading branch information
l0lawrence authored Aug 8, 2024
1 parent f729bea commit 38b3cd4
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
USER_AGENT_PREFIX = "azsdk-python-eventhubs"
UAMQP_LIBRARY = "uamqp"
PYAMQP_LIBRARY = "pyamqp"
MAX_BUFFER_LENGTH = 300

NO_RETRY_ERRORS = [
b"com.microsoft:argument-out-of-range",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
EventDataSendError,
OperationTimeoutError
)
from ..._constants import MAX_BUFFER_LENGTH


if TYPE_CHECKING:
Expand Down Expand Up @@ -253,16 +254,24 @@ async def _callback_task(consumer, batch, max_batch_size, max_wait_time):
await asyncio.sleep(0.05)

@staticmethod
async def _receive_task(consumer):
async def _receive_task(consumer, max_batch_size):
# pylint:disable=protected-access
max_retries = consumer._client._config.max_retries
retried_times = 0
running = True
try:
while retried_times <= max_retries and running and consumer._callback_task_run:
try:
# set a default value of consumer._prefetch for buffer length
buff_length = MAX_BUFFER_LENGTH
await consumer._open() # pylint: disable=protected-access
running = await cast(ReceiveClientAsync, consumer._handler).do_work_async(batch=consumer._prefetch)
async with consumer._message_buffer_lock:
buff_length = len(consumer._message_buffer)
if buff_length <= max_batch_size:
running = await cast(ReceiveClientAsync, consumer._handler).do_work_async(
batch=consumer._prefetch
)
await asyncio.sleep(0.05)
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception as exception: # pylint: disable=broad-except
Expand Down Expand Up @@ -314,7 +323,7 @@ async def receive_messages_async(consumer, batch, max_batch_size, max_wait_time)
callback_task = asyncio.create_task(
PyamqpTransportAsync._callback_task(consumer, batch, max_batch_size, max_wait_time)
)
receive_task = asyncio.create_task(PyamqpTransportAsync._receive_task(consumer))
receive_task = asyncio.create_task(PyamqpTransportAsync._receive_task(consumer, max_batch_size))

tasks = [callback_task, receive_task]
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,32 @@ async def on_event(partition_context, event):
await asyncio.sleep(10)
assert on_event.received == 1
await task

@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_receive_mimic_processing_async(auth_credential_senders_async, uamqp_transport):
fully_qualified_namespace, eventhub_name, credential, senders = auth_credential_senders_async
for i in range(305):
senders[0].send(EventData("A"))
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=credential(),
consumer_group="$default",
uamqp_transport=uamqp_transport
)
async def on_event(partition_context, event):
assert partition_context.partition_id == "0"
assert partition_context.consumer_group == "$default"
assert partition_context.eventhub_name == senders[0]._client.eventhub_name
on_event.received += 1
# Mimic processing of event
await asyncio.sleep(20)
assert client._event_processors[0]._consumers[0]._message_buffer <=300

on_event.received = 0
async with client:
task = asyncio.ensure_future(
client.receive(on_event, partition_id="0", starting_position="-1", prefetch=2))
await asyncio.sleep(10)
await task

0 comments on commit 38b3cd4

Please sign in to comment.