Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add native delayed delivery API to kombu #2128

Merged
merged 8 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Kombu Core
kombu.abstract
kombu.resource
kombu.serialization
kombu.native_delayed_delivery
Nusnus marked this conversation as resolved.
Show resolved Hide resolved

Kombu Transports
================
Expand Down
11 changes: 11 additions & 0 deletions docs/reference/kombu.native_delayed_delivery.rst
thedrow marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
==========================================================
Native Delayed Delivery - ``native_delayed_delivery``
==========================================================

.. contents::
:local:
.. currentmodule:: kombu.native_delayed_delivery

.. automodule:: kombu.native_delayed_delivery
:members:
:undoc-members:
13 changes: 13 additions & 0 deletions docs/userguide/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,16 @@ for priorities using different queues.
:file:`client.py`:

.. literalinclude:: ../../examples/simple_task_queue/client.py

.. _native-delayed-delivery-example:

Native Delayed Delivery
=======================

This example demonstrates how to declare native delayed delivery queues and exchanges and publish a message using
the native delayed delivery mechanism.

:file:`delayed_infra.py`:

.. literalinclude:: ../../examples/delayed_infra.py
:language: python
24 changes: 24 additions & 0 deletions examples/delayed_infra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from examples.experimental.async_consume import queue
from kombu import Connection, Exchange, Queue
from kombu.native_delayed_delivery import (
bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key,
declare_native_delayed_delivery_exchanges_and_queues, level_name)

with Connection('amqp://guest:guest@localhost:5672//') as connection:
declare_native_delayed_delivery_exchanges_and_queues(connection, 'quorum')

destination_exchange = Exchange(
'destination', type='topic')
destination_queue = Queue("destination", exchange=destination_exchange)
bind_queue_to_native_delayed_delivery_exchange(connection, queue)

channel = connection.channel()
with connection.Producer(channel=channel) as producer:
routing_key = calculate_routing_key(30, 'destination')
producer.publish(
"delayed msg",
routing_key=routing_key,
exchange=level_name(27)
)
8 changes: 4 additions & 4 deletions kombu/asynchronous/aws/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def __init__(self, sqs_connection, http_client=None,
super().__init__(sqs_connection, http_client,
**http_client_params)

def make_request(self, operation, params_, path, verb, callback=None): # noqa
def make_request(self, operation, params_, path, verb, callback=None):
params = params_.copy()
if operation:
params['Action'] = operation
Expand All @@ -202,7 +202,7 @@ def make_request(self, operation, params_, path, verb, callback=None): # noqa

return self._mexe(prepared_request, callback=callback)

def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa
def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None):
return self.make_request(
operation, params, path, verb,
callback=transform(
Expand All @@ -211,15 +211,15 @@ def get_list(self, operation, params, markers, path='/', parent=None, verb='POST
),
)

def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None):
return self.make_request(
operation, params, path, verb,
callback=transform(
self._on_obj_ready, callback, parent or self, operation
),
)

def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None):
return self.make_request(
operation, params, path, verb,
callback=transform(
Expand Down
5 changes: 2 additions & 3 deletions kombu/asynchronous/aws/sqs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ def _create_json_request(self, operation, params, queue_url):
**param_payload
)

def make_request(self, operation_name, params, queue_url, verb, callback=None): # noqa
"""
Override make_request to support different protocols.
def make_request(self, operation_name, params, queue_url, verb, callback=None):
"""Override make_request to support different protocols.

botocore is soon going to change the default protocol of communicating
with SQS backend from 'query' to 'json', so we need a special
Expand Down
2 changes: 1 addition & 1 deletion kombu/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def switch(self, conn_str):
self.declared_entities.clear()
self._closed = False
conn_params = (
parse_url(conn_str) if "://" in conn_str else {"hostname": conn_str} # noqa
parse_url(conn_str) if "://" in conn_str else {"hostname": conn_str}
)
self._init_params(**dict(self._initial_params, **conn_params))

Expand Down
105 changes: 105 additions & 0 deletions kombu/native_delayed_delivery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Native Delayed Delivery API.

Only relevant for RabbitMQ.
"""
from __future__ import annotations

from kombu import Connection, Exchange, Queue
from kombu.log import get_logger

logger = get_logger(__name__)

MAX_NUMBER_OF_BITS_TO_USE = 28
MAX_LEVEL = MAX_NUMBER_OF_BITS_TO_USE - 1
CELERY_DELAYED_DELIVERY_EXCHANGE = "celery_delayed_delivery"


def level_name(level: int) -> str:
"""Generates the delayed queue/exchange name based on the level."""
if level < 0:
raise ValueError("level must be a non-negative number")

return f"celery_delayed_{level}"


def declare_native_delayed_delivery_exchanges_and_queues(connection: Connection, queue_type: str) -> None:
"""Declares all native delayed delivery exchanges and queues."""
if queue_type != "classic" and queue_type != "quorum":
raise ValueError("queue_type must be either classic or quorum")

channel = connection.channel()

routing_key: str = "1.#"

for level in range(27, -1, - 1):
current_level = level_name(level)
next_level = level_name(level - 1) if level > 0 else None

delayed_exchange: Exchange = Exchange(
current_level, type="topic").bind(channel)
delayed_exchange.declare()

queue_arguments = {
"x-queue-type": queue_type,
"x-overflow": "reject-publish",
"x-message-ttl": pow(2, level) * 1000,
"x-dead-letter-exchange": next_level if level > 0 else CELERY_DELAYED_DELIVERY_EXCHANGE,
}

if queue_type == 'quorum':
queue_arguments["x-dead-letter-strategy"] = "at-least-once"

delayed_queue: Queue = Queue(
current_level,
queue_arguments=queue_arguments
).bind(channel)
delayed_queue.declare()
delayed_queue.bind_to(current_level, routing_key)

routing_key = "*." + routing_key

routing_key = "0.#"
for level in range(27, 0, - 1):
current_level = level_name(level)
next_level = level_name(level - 1) if level > 0 else None

next_level_exchange: Exchange = Exchange(
next_level, type="topic").bind(channel)

next_level_exchange.bind_to(current_level, routing_key)

routing_key = "*." + routing_key

delivery_exchange: Exchange = Exchange(
CELERY_DELAYED_DELIVERY_EXCHANGE, type="topic").bind(channel)
delivery_exchange.declare()
delivery_exchange.bind_to(level_name(0), routing_key)


def bind_queue_to_native_delayed_delivery_exchange(connection: Connection, queue: Queue) -> None:
"""Binds a queue to the native delayed delivery exchange."""
channel = connection.channel()
queue = queue.bind(channel)
exchange: Exchange = queue.exchange.bind(channel)

if exchange.type == 'direct':
logger.warn(f"Exchange {exchange.name} is a direct exchange "
f"and native delayed delivery do not support direct exchanges.\n"
f"ETA tasks published to this exchange will block the worker until the ETA arrives.")
return

routing_key = queue.routing_key if queue.routing_key.startswith(
'#') else f"#.{queue.routing_key}"
exchange.bind_to(CELERY_DELAYED_DELIVERY_EXCHANGE, routing_key=routing_key)
queue.bind_to(exchange.name, routing_key=routing_key)


def calculate_routing_key(countdown: int, routing_key: str) -> str:
"""Calculate the routing key for publishing a delayed message based on the countdown."""
if countdown < 1:
raise ValueError("countdown must be a positive number")

if not routing_key:
raise ValueError("routing_key must be non-empty")

return '.'.join(list(f'{countdown:028b}')) + f'.{routing_key}'
2 changes: 1 addition & 1 deletion kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No
""" # noqa: E501
"""


from __future__ import annotations
Expand Down
28 changes: 27 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,33 @@ all_files = 1
[flake8]
# classes can be lowercase, arguments and variables can be uppercase
# whenever it makes the code more readable.
extend-ignore = W504, N806, N802, N801, N803
max-line-length = 117
extend-ignore =
# classes can be lowercase, arguments and variables can be uppercase
# whenever it makes the code more readable.
W504, N806, N802, N801, N803
# incompatible with black https://github.com/psf/black/issues/315#issuecomment-395457972
E203,
# Missing docstring in public method
D102,
# Missing docstring in public package
D104,
# Missing docstring in magic method
D105,
# Missing docstring in __init__
D107,
# First line should be in imperative mood; try rephrasing
D401,
# No blank lines allowed between a section header and its content
D412,
# ambiguous variable name '...'
E741,
# ambiguous class definition '...'
E742,
per-file-ignores =
t/*,setup.py,examples/*,docs/*,extra/*:
# docstrings
D,

[isort]
add_imports =
Expand Down
107 changes: 107 additions & 0 deletions t/unit/test_native_delayed_delivery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

import logging
from unittest.mock import Mock

import pytest

from kombu.native_delayed_delivery import (
CELERY_DELAYED_DELIVERY_EXCHANGE,
bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key,
declare_native_delayed_delivery_exchanges_and_queues, level_name)


class test_native_delayed_delivery_level_name:
def test_level_name_with_negative_level(self):
with pytest.raises(ValueError, match="level must be a non-negative number"):
level_name(-1)

def test_level_name_with_level_0(self):
assert level_name(0) == 'celery_delayed_0'

def test_level_name_with_level_1(self):
assert level_name(1) == 'celery_delayed_1'


class test_declare_native_delayed_delivery_exchanges_and_queues:
def test_invalid_queue_type(self):
with pytest.raises(ValueError, match="queue_type must be either classic or quorum"):
declare_native_delayed_delivery_exchanges_and_queues(Mock(), 'foo')

def test_classic_queue_type(self):
declare_native_delayed_delivery_exchanges_and_queues(Mock(), 'classic')

def test_quorum_queue_type(self):
declare_native_delayed_delivery_exchanges_and_queues(Mock(), 'quorum')


class test_bind_queue_to_native_delayed_delivery_exchange:
def test_bind_to_direct_exchange(self, caplog):
with caplog.at_level(logging.WARNING):
queue_mock = Mock()
queue_mock.bind().exchange.bind().type = 'direct'
queue_mock.bind().exchange.bind().name = 'foo'

bind_queue_to_native_delayed_delivery_exchange(Mock(), queue_mock)

assert len(caplog.records) == 1

record = caplog.records[0]
assert (record.message == "Exchange foo is a direct exchange "
"and native delayed delivery do not support direct exchanges.\n"
"ETA tasks published to this exchange will "
"block the worker until the ETA arrives.")

def test_bind_to_topic_exchange(self):
queue_mock = Mock()
queue_mock.bind().exchange.bind().type = 'topic'
queue_mock.bind().exchange.bind().name = 'foo'
queue_mock.bind().routing_key = 'foo'

bind_queue_to_native_delayed_delivery_exchange(Mock(), queue_mock)
queue_mock.bind().exchange.bind().bind_to.assert_called_once_with(
CELERY_DELAYED_DELIVERY_EXCHANGE,
routing_key="#.foo"
)
queue_mock.bind().bind_to.assert_called_once_with(
'foo',
routing_key="#.foo"
)

def test_bind_to_topic_exchange_with_wildcard_routing_key(self):
queue_mock = Mock()
queue_mock.bind().exchange.bind().type = 'topic'
queue_mock.bind().exchange.bind().name = 'foo'
queue_mock.bind().routing_key = '#.foo'

bind_queue_to_native_delayed_delivery_exchange(Mock(), queue_mock)
queue_mock.bind().exchange.bind().bind_to.assert_called_once_with(
CELERY_DELAYED_DELIVERY_EXCHANGE,
routing_key="#.foo"
)
queue_mock.bind().bind_to.assert_called_once_with(
'foo',
routing_key="#.foo"
)


class test_calculate_routing_key:
def test_calculate_routing_key(self):
assert (calculate_routing_key(1, 'destination')
== '0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.1.destination')

def test_negative_countdown(self):
with pytest.raises(ValueError, match="countdown must be a positive number"):
calculate_routing_key(-1, 'foo')

def test_zero_countdown(self):
with pytest.raises(ValueError, match="countdown must be a positive number"):
calculate_routing_key(0, 'foo')

def test_empty_routing_key(self):
with pytest.raises(ValueError, match="routing_key must be non-empty"):
calculate_routing_key(1, '')

def test_none_routing_key(self):
with pytest.raises(ValueError, match="routing_key must be non-empty"):
calculate_routing_key(1, None)
2 changes: 1 addition & 1 deletion t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def test_get_bulk_raises_empty(self):

def test_optional_b64_decode(self):
raw = b'{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task": "celery.task.PingTask",' \
b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}' # noqa
b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}'
b64_enc = base64.b64encode(raw)
assert self.channel._optional_b64_decode(b64_enc) == raw
assert self.channel._optional_b64_decode(raw) == raw
Expand Down
2 changes: 1 addition & 1 deletion t/unit/transport/test_azureservicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def get_queue_runtime_properties(self, queue_name):

URL_NOCREDS = 'azureservicebus://'
URL_CREDS_SAS = 'azureservicebus://policyname:ke/y@hostname'
URL_CREDS_SAS_FQ = 'azureservicebus://policyname:ke/[email protected]' # noqa
URL_CREDS_SAS_FQ = 'azureservicebus://policyname:ke/[email protected]'
URL_CREDS_DA = 'azureservicebus://DefaultAzureCredential@hostname'
URL_CREDS_DA_FQ = 'azureservicebus://[email protected]' # noqa
URL_CREDS_MI = 'azureservicebus://ManagedIdentityCredential@hostname'
Expand Down
Loading