django-eb-sqs is a simple task manager for AWS SQS. It uses SQS and the boto3 library.
Install the module with pip install git+git://github.com/cuda-networks/django-eb-sqs.git
or add it to your requirements.txt
.
Don't forget to add django-eb-sqs app to your Django INSTALLED_APPS
settings:
INSTALLED_APPS = (
...,
'eb_sqs',
)
Adding a task to a queue is simple.
from eb_sqs.decorators import task
@task(queue_name='test')
def echo(message):
print(message)
echo.delay(message='Hello World!')
NOTE: This assumes that you have your AWS keys in the appropriate environment variables, or are using IAM roles. Consult the boto3
documentation for further info.
If you don't pass a queue name, the EB_SQS_DEFAULT_QUEUE
setting is used. If not set, the queue name is eb-sqs-default
.
Additionally the task decorator supports max_retries
(default 0
) and use_pickle
(default False
) attributes for advanced control task execution.
You can also delay the execution of a task by specifying the delay time in seconds.
echo.delay(message='Hello World!', delay=60)
During development it is sometimes useful to execute a task immediately without using SQS. This is possible with the execute_inline
argument.
echo.delay(message='Hello World!', execute_inline=True)
NOTE: delay
is not applied when execute_inline
is set to True
.
Failed tasks can be retried by using the retry
method. See the following example:
from eb_sqs.decorators import task
@task(queue_name='test', max_retries=5)
def upload_file(message):
print('# of retries: {}'.format(upload_file.retry_num))
try:
# upload ...
except ConnectionException:
upload_file.retry()
The retry call supports the delay
and execute_inline
arguments in order to delay the retry or execute it inline. If the retry shall not be counted for the max retry limit set count_retries
to false. Use 'retry_num' to get the number of retries for the current task.
NOTE: retry()
throws a MaxRetriesReachedException
exception if the maximum number of retries is reached.
In order to execute tasks, use the Django command process_queue
.
This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in.
python manage.py process_queue --queues <comma-delimited list of queue names>
You can either use full queue names, or queue prefix using prefix:*my_example_prefix*
notation.
Examples:
python manage.py process_queue --queues queue1,queue2 # process queue1 and queue2
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'
Use the signals MESSAGES_RECEIVED
, MESSAGES_PROCESSED
, MESSAGES_DELETED
of the WorkerService
to get informed about the current SQS batch being processed by the management command.
This is a helper tool for the case you wish to define one of your class method as a task, and make it seamless to all callers. This makes the code much simpler, and allows using classes to invoke your method directly without considering whether it's invoked async or not.
This is how you would define your class:
from eb_sqs.auto_tasks.service import AutoTaskService
class MyService:
def __init__(self, p1=default1, ..., pN=defaultN, auto_task_service=None):
self._auto_task_service = auto_task_service or AutoTaskService()
self._auto_task_service.register_task(self.my_task_method)
def my_task_method(self, *args, **kwargs):
...
Notice the following:
- Your class needs to have defaults for all parameters in the c'tor
- The c'tor must have a parameter named
auto_task_service
- The method shouldn't have any return value (as it's invoked async)
In case you want your method to retry certain cases, you need to raise RetryableTaskException
.
You can provide on optional delay
time for the retry, set count_retries=False
in case you don't want to limit retries, or use max_retries_func
to specify a function which will be invoked when the defined maximum number of retries is exhausted.
The following settings can be used to fine tune django-eb-sqs. Copy them into your Django settings.py
file.
- EB_AWS_REGION (
us-east-1
): The AWS region to use when working with SQS. - EB_SQS_MAX_NUMBER_OF_MESSAGES (
10
): The maximum number of messages to read in a single call from SQS (<= 10). - EB_SQS_WAIT_TIME_S (
2
): The time to wait (seconds) when receiving messages from SQS. - NO_QUEUES_WAIT_TIME_S (
5
): The time a workers waits if there are no SQS queues available to process. - EB_SQS_AUTO_ADD_QUEUE (
False
): If queues should be added automatically to AWS if they don't exist. - EB_SQS_QUEUE_MESSAGE_RETENTION (
1209600
): The value (in seconds) to be passed to MessageRetentionPeriod parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True). - EB_SQS_QUEUE_VISIBILITY_TIMEOUT (
300
): The value (in seconds) to be passed to VisibilityTimeout parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True). - EB_SQS_DEAD_LETTER_MODE (
False
): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is. - EB_SQS_DEFAULT_DELAY (
0
): Default task delay time in seconds. - EB_SQS_DEFAULT_MAX_RETRIES (
0
): Default retry limit for all tasks. - EB_SQS_DEFAULT_COUNT_RETRIES (
True
): Count retry calls. Needed if max retries check shall be executed. - EB_SQS_DEFAULT_QUEUE (
eb-sqs-default
): Default queue name if none is specified when creating a task. - EB_SQS_EXECUTE_INLINE (
False
): Execute tasks immediately without using SQS. Useful during development. Global settingTrue
will override setting it on a task level. - EB_SQS_FORCE_SERIALIZATION (
False
): Forces serialization of tasks when executedinline
. This setting is helpful during development to see if all arguments are serialized and deserialized properly. - EB_SQS_QUEUE_PREFIX (``): Prefix to use for the queues. The prefix is added to the queue name.
- EB_SQS_USE_PICKLE (
False
): Enable to usepickle
to serialize task parameters. Usesjson
as default. - EB_SQS_AWS_MAX_RETRIES (
30
): Default retry limit on a boto3 call to AWS SQS. - EB_SQS_REFRESH_PREFIX_QUEUES_S (
10
): Minimal number of seconds to wait between refreshing queue list, in case prefix is used
Make sure to install the development dependencies from development.txt
.
The build in tests can be executed with the Django test runner.
python -m django test --settings=eb_sqs.test_settings