Skip to content

Commit

Permalink
feat: use env var to limit eta task
Browse files Browse the repository at this point in the history
  • Loading branch information
Iasmini Gomes committed Jun 6, 2024
1 parent 29281d8 commit 51f9b82
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
1 change: 1 addition & 0 deletions django_cloud_tasks/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, *args, **kwargs):
self.delimiter = self._fetch_config(name="DELIMITER", default="--")
self.eager = self._fetch_config(name="EAGER", default=False)
self.tasks_url_name = self._fetch_config(name="URL_NAME", default="tasks-endpoint")
self.tasks_max_eta = self._fetch_config(name="MAXIMUM_ETA_TASK", default=60 * 60 * 24) # 1 day
self.subscribers_url_name = self._fetch_config(name="SUBSCRIBERS_URL_NAME", default="subscriptions-endpoint")

self.subscribers_max_retries = self._fetch_config(name="SUBSCRIBER_MAX_RETRIES", default=None)
Expand Down
30 changes: 20 additions & 10 deletions django_cloud_tasks/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,23 +218,33 @@ def asap(cls, **kwargs):

@classmethod
def later(cls, task_kwargs: dict, eta: int | timedelta | datetime, queue: str = None, headers: dict | None = None):
delay_in_seconds = cls._calculate_delay_in_seconds(eta)
cls._validate_delay(delay_in_seconds)
return cls.push(
task_kwargs=task_kwargs,
queue=queue,
headers=headers,
delay_in_seconds=delay_in_seconds,
)

@staticmethod
def _calculate_delay_in_seconds(eta: int | timedelta | datetime) -> int:
if isinstance(eta, int):
delay_in_seconds = eta
return eta
elif isinstance(eta, timedelta):
delay_in_seconds = eta.total_seconds()
return int(eta.total_seconds())
elif isinstance(eta, datetime):
delay_in_seconds = (eta - now()).total_seconds()
return int((eta - now()).total_seconds())
else:
raise ValueError(
f"Unsupported schedule {eta} of type {eta.__class__.__name__}. " "Must be int, timedelta or datetime."
f"Unsupported schedule {eta} of type {eta.__class__.__name__}. Must be int, timedelta or datetime."
)

return cls.push(
task_kwargs=task_kwargs,
queue=queue,
headers=headers,
delay_in_seconds=delay_in_seconds,
)
@staticmethod
def _validate_delay(delay_in_seconds: int):
max_eta_task = get_config("tasks_max_eta")
if max_eta_task is not None and delay_in_seconds > max_eta_task:
raise ValueError(f"Invalid delay time {delay_in_seconds}, maximum is {max_eta_task}")

@classmethod
def until(cls, task_kwargs: dict, max_eta: datetime, queue: str = None, headers: dict | None = None):
Expand Down
11 changes: 11 additions & 0 deletions sample_project/sample_app/tests/tests_tasks/tests_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ def test_task_later_error(self):

push.assert_not_called()

def test_task_later_delay_exceeds_maximum_eta(self):
task_kwargs = dict(price=30, quantity=4, discount=0.2)
excessive_delay = int(60 * 60 * 24 * 2) # 2 days

with self.assertRaises(ValueError) as context:
tasks.CalculatePriceTask.later(eta=excessive_delay, task_kwargs=task_kwargs)

self.assertEqual(
f"Invalid delay time {excessive_delay}, maximum is {get_config('tasks_max_eta')}", str(context.exception)
)

def test_singleton_client_on_task(self):
# we have a singleton if it calls the same task twice
with (
Expand Down

0 comments on commit 51f9b82

Please sign in to comment.