Skip to content

Commit

Permalink
Place too large message in rejected queue
Browse files Browse the repository at this point in the history
Signed-off-by: jounaidr <[email protected]>
  • Loading branch information
jounaidr authored and tofu-rocketry committed Aug 2, 2022
1 parent e72aa7d commit 8b3b0a5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
5 changes: 3 additions & 2 deletions conf/sender.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ ams_project: accounting
# Queue to which SSM will send messages
destination: gLite-APEL

# Outgoing messages will be read and removed from this directory.
path: /var/spool/apel/outgoing
# Accepted messages will be read and removed from <path>/outgoing
# Rejected messages will be moved to <path>/reject
path: /var/spool/apel
# If 'path_type' is set to 'dirq' (or if 'path_type' is omitted), the supplied
# 'path' will be treated as a Python dirq (a directory based queue, which is a
# port of the Perl module Directory::Queue).
Expand Down
20 changes: 17 additions & 3 deletions ssm/ssm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,21 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None,

# create the filesystem queues for accepted and rejected messages
if dest is not None and listen is None:
outqpath = os.path.join(qpath, 'outgoing')
rejectqpath = os.path.join(qpath, 'reject')

# Determine what sort of outgoing structure to make
if path_type == 'dirq':
if QueueSimple is None:
raise ImportError("dirq path_type requested but the dirq "
"module wasn't found.")

self._outq = QueueSimple(qpath)
self._outq = QueueSimple(outqpath)
self._rejectq = MessageDirectory(rejectqpath)

elif path_type == 'directory':
self._outq = MessageDirectory(qpath)
self._outq = MessageDirectory(outqpath)
self._rejectq = MessageDirectory(rejectqpath)
else:
raise Ssm2Exception('Unsupported path_type variable.')

Expand Down Expand Up @@ -501,8 +506,17 @@ def send_all(self):
if "Message size is too large" not in str(e):
raise
else:
# Exit out of loop iteration so that message is not removed.
log.warn('Message %s could not be sent as its larger than 1MB', msgid)

# Add the message to the rejected queue
name = self._rejectq.add(text)
log.info("Message %s saved to reject queue as %s", msgid, name)

# Remove the message from the outgoing queue
self._last_msg = None
self._outq.remove(msgid)

# Exit out of loop iteration so that message is not removed.
continue

else:
Expand Down

0 comments on commit 8b3b0a5

Please sign in to comment.