From d41263bf125257d95b2d4748657406aa360a6b20 Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Mon, 30 Sep 2024 09:25:59 +0300 Subject: [PATCH 1/7] Adjust flake8 configuration to match Celery's configuration. --- setup.cfg | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index ced537586..488d782fa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,33 @@ all_files = 1 [flake8] # classes can be lowercase, arguments and variables can be uppercase # whenever it makes the code more readable. -extend-ignore = W504, N806, N802, N801, N803 +max-line-length = 117 +extend-ignore = + # classes can be lowercase, arguments and variables can be uppercase + # whenever it makes the code more readable. + W504, N806, N802, N801, N803 + # incompatible with black https://github.com/psf/black/issues/315#issuecomment-395457972 + E203, + # Missing docstring in public method + D102, + # Missing docstring in public package + D104, + # Missing docstring in magic method + D105, + # Missing docstring in __init__ + D107, + # First line should be in imperative mood; try rephrasing + D401, + # No blank lines allowed between a section header and its content + D412, + # ambiguous variable name '...' + E741, + # ambiguous class definition '...' + E742, +per-file-ignores = + t/*,setup.py,examples/*,docs/*,extra/*: + # docstrings + D, [isort] add_imports = From af4f351308478ec530e95077665be3dfd20ad96e Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Mon, 30 Sep 2024 09:26:16 +0300 Subject: [PATCH 2/7] Add native delayed delivery API. --- examples/delayed_infra.py | 24 ++++++ kombu/native_delayed_delivery.py | 104 ++++++++++++++++++++++++ t/unit/test_native_delayed_delivery.py | 107 +++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 examples/delayed_infra.py create mode 100644 kombu/native_delayed_delivery.py create mode 100644 t/unit/test_native_delayed_delivery.py diff --git a/examples/delayed_infra.py b/examples/delayed_infra.py new file mode 100644 index 000000000..af261a074 --- /dev/null +++ b/examples/delayed_infra.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from examples.experimental.async_consume import queue +from kombu import Connection, Exchange, Queue +from kombu.native_delayed_delivery import ( + bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key, + declare_native_delayed_delivery_exchanges_and_queues, level_name) + +with Connection('amqp://guest:guest@localhost:5672//') as connection: + declare_native_delayed_delivery_exchanges_and_queues(connection, 'quorum') + + destination_exchange = Exchange( + 'destination', type='topic') + destination_queue = Queue("destination", exchange=destination_exchange) + bind_queue_to_native_delayed_delivery_exchange(connection, queue) + + channel = connection.channel() + with connection.Producer(channel=channel) as producer: + routing_key = calculate_routing_key(30, 'destination') + producer.publish( + "delayed msg", + routing_key=routing_key, + exchange=level_name(27) + ) diff --git a/kombu/native_delayed_delivery.py b/kombu/native_delayed_delivery.py new file mode 100644 index 000000000..de99e5ac9 --- /dev/null +++ b/kombu/native_delayed_delivery.py @@ -0,0 +1,104 @@ +"""Native Delayed Delivery API. +Only relevant for RabbitMQ. +""" +from __future__ import annotations + +from kombu import Connection, Exchange, Queue +from kombu.log import get_logger + +logger = get_logger(__name__) + +MAX_NUMBER_OF_BITS_TO_USE = 28 +MAX_LEVEL = MAX_NUMBER_OF_BITS_TO_USE - 1 +CELERY_DELAYED_DELIVERY_EXCHANGE = "celery_delayed_delivery" + + +def level_name(level: int) -> str: + """Generates the delayed queue/exchange name based on the level.""" + if level < 0: + raise ValueError("level must be a non-negative number") + + return f"celery_delayed_{level}" + + +def declare_native_delayed_delivery_exchanges_and_queues(connection: Connection, queue_type: str) -> None: + """Declares all native delayed delivery exchanges and queues.""" + if queue_type != "classic" and queue_type != "quorum": + raise ValueError("queue_type must be either classic or quorum") + + channel = connection.channel() + + routing_key: str = "1.#" + + for level in range(27, -1, - 1): + current_level = level_name(level) + next_level = level_name(level - 1) if level > 0 else None + + delayed_exchange: Exchange = Exchange( + current_level, type="topic").bind(channel) + delayed_exchange.declare() + + queue_arguments = { + "x-queue-type": queue_type, + "x-overflow": "reject-publish", + "x-message-ttl": pow(2, level) * 1000, + "x-dead-letter-exchange": next_level if level > 0 else CELERY_DELAYED_DELIVERY_EXCHANGE, + } + + if queue_type == 'quorum': + queue_arguments["x-dead-letter-strategy"] = "at-least-once" + + delayed_queue: Queue = Queue( + current_level, + queue_arguments=queue_arguments + ).bind(channel) + delayed_queue.declare() + delayed_queue.bind_to(current_level, routing_key) + + routing_key = "*." + routing_key + + routing_key = "0.#" + for level in range(27, 0, - 1): + current_level = level_name(level) + next_level = level_name(level - 1) if level > 0 else None + + next_level_exchange: Exchange = Exchange( + next_level, type="topic").bind(channel) + + next_level_exchange.bind_to(current_level, routing_key) + + routing_key = "*." + routing_key + + delivery_exchange: Exchange = Exchange( + CELERY_DELAYED_DELIVERY_EXCHANGE, type="topic").bind(channel) + delivery_exchange.declare() + delivery_exchange.bind_to(level_name(0), routing_key) + + +def bind_queue_to_native_delayed_delivery_exchange(connection: Connection, queue: Queue) -> None: + """Binds a queue to the native delayed delivery exchange.""" + channel = connection.channel() + queue = queue.bind(channel) + exchange: Exchange = queue.exchange.bind(channel) + + if exchange.type == 'direct': + logger.warn(f"Exchange {exchange.name} is a direct exchange " + f"and native delayed delivery do not support direct exchanges.\n" + f"ETA tasks published to this exchange will block the worker until the ETA arrives.") + return + + routing_key = queue.routing_key if queue.routing_key.startswith( + '#') else f"#.{queue.routing_key}" + exchange.bind_to(CELERY_DELAYED_DELIVERY_EXCHANGE, routing_key=routing_key) + queue.bind_to(exchange.name, routing_key=routing_key) + + +def calculate_routing_key(countdown: int, routing_key: str) -> str: + """Calculate the routing key for publishing a delayed message based on the countdown.""" + if countdown < 1: + raise ValueError("countdown must be a positive number") + + if not routing_key: + raise ValueError("routing_key must be non-empty") + + return '.'.join(list(f'{countdown:028b}')) + f'.{routing_key}' diff --git a/t/unit/test_native_delayed_delivery.py b/t/unit/test_native_delayed_delivery.py new file mode 100644 index 000000000..6cee476b3 --- /dev/null +++ b/t/unit/test_native_delayed_delivery.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import logging +from unittest.mock import Mock + +import pytest + +from kombu.native_delayed_delivery import ( + CELERY_DELAYED_DELIVERY_EXCHANGE, + bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key, + declare_native_delayed_delivery_exchanges_and_queues, level_name) + + +class test_native_delayed_delivery_level_name: + def test_level_name_with_negative_level(self): + with pytest.raises(ValueError, match="level must be a non-negative number"): + level_name(-1) + + def test_level_name_with_level_0(self): + assert level_name(0) == 'celery_delayed_0' + + def test_level_name_with_level_1(self): + assert level_name(1) == 'celery_delayed_1' + + +class test_declare_native_delayed_delivery_exchanges_and_queues: + def test_invalid_queue_type(self): + with pytest.raises(ValueError, match="queue_type must be either classic or quorum"): + declare_native_delayed_delivery_exchanges_and_queues(Mock(), 'foo') + + def test_classic_queue_type(self): + declare_native_delayed_delivery_exchanges_and_queues(Mock(), 'classic') + + def test_quorum_queue_type(self): + declare_native_delayed_delivery_exchanges_and_queues(Mock(), 'quorum') + + +class test_bind_queue_to_native_delayed_delivery_exchange: + def test_bind_to_direct_exchange(self, caplog): + with caplog.at_level(logging.WARNING): + queue_mock = Mock() + queue_mock.bind().exchange.bind().type = 'direct' + queue_mock.bind().exchange.bind().name = 'foo' + + bind_queue_to_native_delayed_delivery_exchange(Mock(), queue_mock) + + assert len(caplog.records) == 1 + + record = caplog.records[0] + assert (record.message == "Exchange foo is a direct exchange " + "and native delayed delivery do not support direct exchanges.\n" + "ETA tasks published to this exchange will " + "block the worker until the ETA arrives.") + + def test_bind_to_topic_exchange(self): + queue_mock = Mock() + queue_mock.bind().exchange.bind().type = 'topic' + queue_mock.bind().exchange.bind().name = 'foo' + queue_mock.bind().routing_key = 'foo' + + bind_queue_to_native_delayed_delivery_exchange(Mock(), queue_mock) + queue_mock.bind().exchange.bind().bind_to.assert_called_once_with( + CELERY_DELAYED_DELIVERY_EXCHANGE, + routing_key="#.foo" + ) + queue_mock.bind().bind_to.assert_called_once_with( + 'foo', + routing_key="#.foo" + ) + + def test_bind_to_topic_exchange_with_wildcard_routing_key(self): + queue_mock = Mock() + queue_mock.bind().exchange.bind().type = 'topic' + queue_mock.bind().exchange.bind().name = 'foo' + queue_mock.bind().routing_key = '#.foo' + + bind_queue_to_native_delayed_delivery_exchange(Mock(), queue_mock) + queue_mock.bind().exchange.bind().bind_to.assert_called_once_with( + CELERY_DELAYED_DELIVERY_EXCHANGE, + routing_key="#.foo" + ) + queue_mock.bind().bind_to.assert_called_once_with( + 'foo', + routing_key="#.foo" + ) + + +class test_calculate_routing_key: + def test_calculate_routing_key(self): + assert (calculate_routing_key(1, 'destination') + == '0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1.destination') + + def test_negative_countdown(self): + with pytest.raises(ValueError, match="countdown must be a positive number"): + calculate_routing_key(-1, 'foo') + + def test_zero_countdown(self): + with pytest.raises(ValueError, match="countdown must be a positive number"): + calculate_routing_key(0, 'foo') + + def test_empty_routing_key(self): + with pytest.raises(ValueError, match="routing_key must be non-empty"): + calculate_routing_key(1, '') + + def test_none_routing_key(self): + with pytest.raises(ValueError, match="routing_key must be non-empty"): + calculate_routing_key(1, None) From 6bfab88160bc817383749b96929c44afdc5773d2 Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Sun, 6 Oct 2024 14:24:06 +0300 Subject: [PATCH 3/7] Add documentation. --- docs/userguide/examples.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/userguide/examples.rst b/docs/userguide/examples.rst index e4d20c18a..d326b3ca6 100644 --- a/docs/userguide/examples.rst +++ b/docs/userguide/examples.rst @@ -53,3 +53,16 @@ for priorities using different queues. :file:`client.py`: .. literalinclude:: ../../examples/simple_task_queue/client.py + +.. _native-delayed-delivery-example: + +Native Delayed Delivery +======================= + +This example demonstrates how to declare native delayed delivery queues and exchanges and publish a message using +the native delayed delivery mechanism. + +:file:`delayed_infra.py`: + +.. literalinclude:: ../../examples/delayed_infra.py + :language: python From e4ceef7a947e45009981b27d93e692c7fe8f4423 Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Sun, 6 Oct 2024 15:12:30 +0300 Subject: [PATCH 4/7] Fix linter errors. --- kombu/asynchronous/aws/connection.py | 8 ++++---- kombu/asynchronous/aws/sqs/connection.py | 2 +- kombu/connection.py | 2 +- kombu/transport/SQS.py | 2 +- t/unit/transport/test_SQS.py | 2 +- t/unit/transport/test_azureservicebus.py | 2 +- t/unit/transport/test_mongodb.py | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kombu/asynchronous/aws/connection.py b/kombu/asynchronous/aws/connection.py index 3d408365d..df7d3b24c 100644 --- a/kombu/asynchronous/aws/connection.py +++ b/kombu/asynchronous/aws/connection.py @@ -182,7 +182,7 @@ def __init__(self, sqs_connection, http_client=None, super().__init__(sqs_connection, http_client, **http_client_params) - def make_request(self, operation, params_, path, verb, callback=None): # noqa + def make_request(self, operation, params_, path, verb, callback=None): params = params_.copy() if operation: params['Action'] = operation @@ -202,7 +202,7 @@ def make_request(self, operation, params_, path, verb, callback=None): # noqa return self._mexe(prepared_request, callback=callback) - def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa + def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): return self.make_request( operation, params, path, verb, callback=transform( @@ -211,7 +211,7 @@ def get_list(self, operation, params, markers, path='/', parent=None, verb='POST ), ) - def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa + def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): return self.make_request( operation, params, path, verb, callback=transform( @@ -219,7 +219,7 @@ def get_object(self, operation, params, path='/', parent=None, verb='GET', callb ), ) - def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa + def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): return self.make_request( operation, params, path, verb, callback=transform( diff --git a/kombu/asynchronous/aws/sqs/connection.py b/kombu/asynchronous/aws/sqs/connection.py index 030616229..f1d0448b4 100644 --- a/kombu/asynchronous/aws/sqs/connection.py +++ b/kombu/asynchronous/aws/sqs/connection.py @@ -76,7 +76,7 @@ def _create_json_request(self, operation, params, queue_url): **param_payload ) - def make_request(self, operation_name, params, queue_url, verb, callback=None): # noqa + def make_request(self, operation_name, params, queue_url, verb, callback=None): """ Override make_request to support different protocols. diff --git a/kombu/connection.py b/kombu/connection.py index 7fa9fdec2..32c416b00 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -249,7 +249,7 @@ def switch(self, conn_str): self.declared_entities.clear() self._closed = False conn_params = ( - parse_url(conn_str) if "://" in conn_str else {"hostname": conn_str} # noqa + parse_url(conn_str) if "://" in conn_str else {"hostname": conn_str} ) self._init_params(**dict(self._initial_params, **conn_params)) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 26a69637e..0c8d1ee4e 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -127,7 +127,7 @@ * Supports Fanout: Yes * Supports Priority: No * Supports TTL: No -""" # noqa: E501 +""" from __future__ import annotations diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 2a70d4644..b82be5aa1 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -358,7 +358,7 @@ def test_get_bulk_raises_empty(self): def test_optional_b64_decode(self): raw = b'{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task": "celery.task.PingTask",' \ - b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}' # noqa + b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}' b64_enc = base64.b64encode(raw) assert self.channel._optional_b64_decode(b64_enc) == raw assert self.channel._optional_b64_decode(raw) == raw diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py index 7c1812836..111eab68e 100644 --- a/t/unit/transport/test_azureservicebus.py +++ b/t/unit/transport/test_azureservicebus.py @@ -103,7 +103,7 @@ def get_queue_runtime_properties(self, queue_name): URL_NOCREDS = 'azureservicebus://' URL_CREDS_SAS = 'azureservicebus://policyname:ke/y@hostname' -URL_CREDS_SAS_FQ = 'azureservicebus://policyname:ke/y@hostname.servicebus.windows.net' # noqa +URL_CREDS_SAS_FQ = 'azureservicebus://policyname:ke/y@hostname.servicebus.windows.net' URL_CREDS_DA = 'azureservicebus://DefaultAzureCredential@hostname' URL_CREDS_DA_FQ = 'azureservicebus://DefaultAzureCredential@hostname.servicebus.windows.net' # noqa URL_CREDS_MI = 'azureservicebus://ManagedIdentityCredential@hostname' diff --git a/t/unit/transport/test_mongodb.py b/t/unit/transport/test_mongodb.py index 57110ec59..744bfb22d 100644 --- a/t/unit/transport/test_mongodb.py +++ b/t/unit/transport/test_mongodb.py @@ -87,7 +87,7 @@ def test_custom_port(self): assert hostname == 'mongodb://localhost:27018' def test_replicaset_hosts(self): - url = 'mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=test_rs' # noqa + url = 'mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=test_rs' channel = _create_mock_connection(url).default_channel hostname, dbname, options = channel._parse_uri() From 3ac48e4ae04fa21b258ef31afaeb634259fe881c Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Sun, 6 Oct 2024 15:24:03 +0300 Subject: [PATCH 5/7] Fix pydocstyle errors. --- kombu/asynchronous/aws/sqs/connection.py | 3 +-- kombu/native_delayed_delivery.py | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kombu/asynchronous/aws/sqs/connection.py b/kombu/asynchronous/aws/sqs/connection.py index f1d0448b4..6c79fb41c 100644 --- a/kombu/asynchronous/aws/sqs/connection.py +++ b/kombu/asynchronous/aws/sqs/connection.py @@ -77,8 +77,7 @@ def _create_json_request(self, operation, params, queue_url): ) def make_request(self, operation_name, params, queue_url, verb, callback=None): - """ - Override make_request to support different protocols. + """Override make_request to support different protocols. botocore is soon going to change the default protocol of communicating with SQS backend from 'query' to 'json', so we need a special diff --git a/kombu/native_delayed_delivery.py b/kombu/native_delayed_delivery.py index de99e5ac9..8e54fab9a 100644 --- a/kombu/native_delayed_delivery.py +++ b/kombu/native_delayed_delivery.py @@ -1,4 +1,5 @@ """Native Delayed Delivery API. + Only relevant for RabbitMQ. """ from __future__ import annotations From a5b00b0a03ea98a31a2d91566323a290c64641c0 Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Sun, 6 Oct 2024 15:32:33 +0300 Subject: [PATCH 6/7] Fix apicheck linter. --- docs/reference/index.rst | 1 + docs/reference/kombu.native_delayed_delivery.rst | 11 +++++++++++ 2 files changed, 12 insertions(+) create mode 100644 docs/reference/kombu.native_delayed_delivery.rst diff --git a/docs/reference/index.rst b/docs/reference/index.rst index bddbf2f5e..8a79c2ddc 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -28,6 +28,7 @@ Kombu Core kombu.abstract kombu.resource kombu.serialization + kombu.native_delayed_delivery Kombu Transports ================ diff --git a/docs/reference/kombu.native_delayed_delivery.rst b/docs/reference/kombu.native_delayed_delivery.rst new file mode 100644 index 000000000..040180f6e --- /dev/null +++ b/docs/reference/kombu.native_delayed_delivery.rst @@ -0,0 +1,11 @@ +========================================================== + Native Delayed Delivery - ``native_delayed_delivery`` +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.native_delayed_delivery + +.. automodule:: kombu.native_delayed_delivery + :members: + :undoc-members: From 7674a8e141aa2c54cd668bc1cbaf0f9624d9b6c0 Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Sat, 12 Oct 2024 21:07:08 +0300 Subject: [PATCH 7/7] Address code review. --- ...very.rst => kombu.transport.native_delayed_delivery.rst} | 6 ++++-- examples/delayed_infra.py | 2 +- kombu/{ => transport}/native_delayed_delivery.py | 0 t/unit/{ => transport}/test_native_delayed_delivery.py | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) rename docs/reference/{kombu.native_delayed_delivery.rst => kombu.transport.native_delayed_delivery.rst} (63%) rename kombu/{ => transport}/native_delayed_delivery.py (100%) rename t/unit/{ => transport}/test_native_delayed_delivery.py (98%) diff --git a/docs/reference/kombu.native_delayed_delivery.rst b/docs/reference/kombu.transport.native_delayed_delivery.rst similarity index 63% rename from docs/reference/kombu.native_delayed_delivery.rst rename to docs/reference/kombu.transport.native_delayed_delivery.rst index 040180f6e..069a81c90 100644 --- a/docs/reference/kombu.native_delayed_delivery.rst +++ b/docs/reference/kombu.transport.native_delayed_delivery.rst @@ -2,10 +2,12 @@ Native Delayed Delivery - ``native_delayed_delivery`` ========================================================== +.. versionadded:: 5.5 + .. contents:: :local: -.. currentmodule:: kombu.native_delayed_delivery +.. currentmodule:: kombu.transport.native_delayed_delivery -.. automodule:: kombu.native_delayed_delivery +.. automodule:: kombu.transport.native_delayed_delivery :members: :undoc-members: diff --git a/examples/delayed_infra.py b/examples/delayed_infra.py index af261a074..96af8920a 100644 --- a/examples/delayed_infra.py +++ b/examples/delayed_infra.py @@ -2,7 +2,7 @@ from examples.experimental.async_consume import queue from kombu import Connection, Exchange, Queue -from kombu.native_delayed_delivery import ( +from kombu.transport.native_delayed_delivery import ( bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key, declare_native_delayed_delivery_exchanges_and_queues, level_name) diff --git a/kombu/native_delayed_delivery.py b/kombu/transport/native_delayed_delivery.py similarity index 100% rename from kombu/native_delayed_delivery.py rename to kombu/transport/native_delayed_delivery.py diff --git a/t/unit/test_native_delayed_delivery.py b/t/unit/transport/test_native_delayed_delivery.py similarity index 98% rename from t/unit/test_native_delayed_delivery.py rename to t/unit/transport/test_native_delayed_delivery.py index 6cee476b3..0a737017a 100644 --- a/t/unit/test_native_delayed_delivery.py +++ b/t/unit/transport/test_native_delayed_delivery.py @@ -5,7 +5,7 @@ import pytest -from kombu.native_delayed_delivery import ( +from kombu.transport.native_delayed_delivery import ( CELERY_DELAYED_DELIVERY_EXCHANGE, bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key, declare_native_delayed_delivery_exchanges_and_queues, level_name)