From 51f9b82ea469ed0d4df3f57c7e07fc3e28d327ed Mon Sep 17 00:00:00 2001 From: Iasmini Gomes Date: Thu, 23 May 2024 09:04:53 -0300 Subject: [PATCH] feat: use env var to limit eta task --- django_cloud_tasks/apps.py | 1 + django_cloud_tasks/tasks/task.py | 30 ++++++++++++------- .../tests/tests_tasks/tests_tasks.py | 11 +++++++ 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/django_cloud_tasks/apps.py b/django_cloud_tasks/apps.py index 9736240..05331fe 100644 --- a/django_cloud_tasks/apps.py +++ b/django_cloud_tasks/apps.py @@ -30,6 +30,7 @@ def __init__(self, *args, **kwargs): self.delimiter = self._fetch_config(name="DELIMITER", default="--") self.eager = self._fetch_config(name="EAGER", default=False) self.tasks_url_name = self._fetch_config(name="URL_NAME", default="tasks-endpoint") + self.tasks_max_eta = self._fetch_config(name="MAXIMUM_ETA_TASK", default=60 * 60 * 24) # 1 day self.subscribers_url_name = self._fetch_config(name="SUBSCRIBERS_URL_NAME", default="subscriptions-endpoint") self.subscribers_max_retries = self._fetch_config(name="SUBSCRIBER_MAX_RETRIES", default=None) diff --git a/django_cloud_tasks/tasks/task.py b/django_cloud_tasks/tasks/task.py index d188fd6..c6d97e3 100644 --- a/django_cloud_tasks/tasks/task.py +++ b/django_cloud_tasks/tasks/task.py @@ -218,23 +218,33 @@ def asap(cls, **kwargs): @classmethod def later(cls, task_kwargs: dict, eta: int | timedelta | datetime, queue: str = None, headers: dict | None = None): + delay_in_seconds = cls._calculate_delay_in_seconds(eta) + cls._validate_delay(delay_in_seconds) + return cls.push( + task_kwargs=task_kwargs, + queue=queue, + headers=headers, + delay_in_seconds=delay_in_seconds, + ) + + @staticmethod + def _calculate_delay_in_seconds(eta: int | timedelta | datetime) -> int: if isinstance(eta, int): - delay_in_seconds = eta + return eta elif isinstance(eta, timedelta): - delay_in_seconds = eta.total_seconds() + return int(eta.total_seconds()) elif isinstance(eta, datetime): - delay_in_seconds = (eta - now()).total_seconds() + return int((eta - now()).total_seconds()) else: raise ValueError( - f"Unsupported schedule {eta} of type {eta.__class__.__name__}. " "Must be int, timedelta or datetime." + f"Unsupported schedule {eta} of type {eta.__class__.__name__}. Must be int, timedelta or datetime." ) - return cls.push( - task_kwargs=task_kwargs, - queue=queue, - headers=headers, - delay_in_seconds=delay_in_seconds, - ) + @staticmethod + def _validate_delay(delay_in_seconds: int): + max_eta_task = get_config("tasks_max_eta") + if max_eta_task is not None and delay_in_seconds > max_eta_task: + raise ValueError(f"Invalid delay time {delay_in_seconds}, maximum is {max_eta_task}") @classmethod def until(cls, task_kwargs: dict, max_eta: datetime, queue: str = None, headers: dict | None = None): diff --git a/sample_project/sample_app/tests/tests_tasks/tests_tasks.py b/sample_project/sample_app/tests/tests_tasks/tests_tasks.py index 109e414..802b293 100644 --- a/sample_project/sample_app/tests/tests_tasks/tests_tasks.py +++ b/sample_project/sample_app/tests/tests_tasks/tests_tasks.py @@ -224,6 +224,17 @@ def test_task_later_error(self): push.assert_not_called() + def test_task_later_delay_exceeds_maximum_eta(self): + task_kwargs = dict(price=30, quantity=4, discount=0.2) + excessive_delay = int(60 * 60 * 24 * 2) # 2 days + + with self.assertRaises(ValueError) as context: + tasks.CalculatePriceTask.later(eta=excessive_delay, task_kwargs=task_kwargs) + + self.assertEqual( + f"Invalid delay time {excessive_delay}, maximum is {get_config('tasks_max_eta')}", str(context.exception) + ) + def test_singleton_client_on_task(self): # we have a singleton if it calls the same task twice with (