diff --git a/lib/queue/queue.py b/lib/queue/queue.py index 8662d3d..0b1fe36 100644 --- a/lib/queue/queue.py +++ b/lib/queue/queue.py @@ -64,15 +64,23 @@ def get_output_queue_name(self, input_queue_name: str, output_queue_name: str = return output_queue_name def delete_messages(self, queue, messages): - for message in messages: - logger.debug(f"Deleting message of {message}") - queue.delete_messages(Entries=[ - { - 'Id': message.receipt_handle, + """ + Delete messages as we process them so other processes don't pick them up. + SQS deals in max batches of 10, so break up messages into groups of 10 + when deleting them. + """ + for i in range(0, len(messages), 10): + batch = messages[i:i + 10] + entries = [] + for idx, message in enumerate(batch): + logger.debug(f"Deleting message of {message}") + entry = { + 'Id': str(idx), 'ReceiptHandle': message.receipt_handle } - ]) - + entries.append(entry) + queue.delete_messages(Entries=entries) + def safely_respond(self, model: Model) -> Tuple[List[Dict[str, str]], List[Dict[str, Any]]]: """ Rescue against failures when attempting to respond (i.e. fingerprint) from models.