From 3052e3d31112095fa261c1e7eda34080c36a397a Mon Sep 17 00:00:00 2001 From: jounaidr Date: Wed, 8 Sep 2021 16:57:59 +0100 Subject: [PATCH 1/3] Messages larger than 1 MB caught Implement code to stop send_all from clearing the message if it's too large. --- ssm/ssm2.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/ssm/ssm2.py b/ssm/ssm2.py index 72099ad8..039197d4 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -37,7 +37,7 @@ from logging import getLogger, INFO, WARNING, DEBUG try: - from argo_ams_library import ArgoMessagingService, AmsMessage + from argo_ams_library import ArgoMessagingService, AmsMessage, AmsServiceException except ImportError: # ImportError is raised later on if AMS is requested but lib not installed. ArgoMessagingService = None @@ -489,10 +489,19 @@ def send_all(self): log_string = "Sent %s" % msgid elif self._protocol == Ssm2.AMS_MESSAGING: - # Then we are sending to an Argo Messaging Service instance. - argo_id = self._send_msg_ams(text, msgid) - - log_string = "Sent %s, Argo ID: %s" % (msgid, argo_id) + try: + # Then we are sending to an Argo Messaging Service instance. + argo_id = self._send_msg_ams(text, msgid) + log_string = "Sent %s, Argo ID: %s" % (msgid, argo_id) + + except AmsServiceException as e: + # Catch specific 'message too large' exception, raise otherwise. + if "Message size is too large" not in str(e): + raise + else: + # Exit out of loop iteration so that message is not removed. + log.warn('Message %s could not be sent as its larger than 1MB', msgid) + continue else: # The SSM has been improperly configured From bda9e02c0a510d98973c480d319ffe558fcf707c Mon Sep 17 00:00:00 2001 From: jounaidr Date: Fri, 10 Sep 2021 14:51:49 +0100 Subject: [PATCH 2/3] Place too large message in rejected queue Signed-off-by: jounaidr --- conf/sender.cfg | 5 +++-- ssm/ssm2.py | 20 +++++++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/conf/sender.cfg b/conf/sender.cfg index 70ff164a..61c09790 100644 --- a/conf/sender.cfg +++ b/conf/sender.cfg @@ -34,8 +34,9 @@ ams_project: accounting # Queue to which SSM will send messages destination: gLite-APEL -# Outgoing messages will be read and removed from this directory. -path: /var/spool/apel/outgoing +# Accepted messages will be read and removed from /outgoing +# Rejected messages will be moved to /reject +path: /var/spool/apel # If 'path_type' is set to 'dirq' (or if 'path_type' is omitted), the supplied # 'path' will be treated as a Python dirq (a directory based queue, which is a # port of the Perl module Directory::Queue). diff --git a/ssm/ssm2.py b/ssm/ssm2.py index 039197d4..7c7d2df7 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -113,16 +113,21 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, # create the filesystem queues for accepted and rejected messages if dest is not None and listen is None: + outqpath = os.path.join(qpath, 'outgoing') + rejectqpath = os.path.join(qpath, 'reject') + # Determine what sort of outgoing structure to make if path_type == 'dirq': if QueueSimple is None: raise ImportError("dirq path_type requested but the dirq " "module wasn't found.") - self._outq = QueueSimple(qpath) + self._outq = QueueSimple(outqpath) + self._rejectq = QueueSimple(rejectqpath) elif path_type == 'directory': - self._outq = MessageDirectory(qpath) + self._outq = MessageDirectory(outqpath) + self._rejectq = MessageDirectory(rejectqpath) else: raise Ssm2Exception('Unsupported path_type variable.') @@ -499,8 +504,17 @@ def send_all(self): if "Message size is too large" not in str(e): raise else: - # Exit out of loop iteration so that message is not removed. log.warn('Message %s could not be sent as its larger than 1MB', msgid) + + # Add the message to the rejected queue + name = self._rejectq.add(text) + log.info("Message %s saved to reject queue as %s", msgid, name) + + # Remove the message from the outgoing queue + self._last_msg = None + self._outq.remove(msgid) + + # Exit out of loop iteration so that message is not removed. continue else: From e266757389fecfeb02aff2b327dad58dfd334930 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Mon, 12 Jun 2023 13:31:39 +0100 Subject: [PATCH 3/3] Reorder imports, move comment, use logging.warning - Reorder argo_ams_library imports alphabetically. - Move comment to more logical position. - Change logging.warn to .warning as former is deprecating. --- ssm/ssm2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ssm/ssm2.py b/ssm/ssm2.py index 7c7d2df7..d4b96bb4 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -37,7 +37,7 @@ from logging import getLogger, INFO, WARNING, DEBUG try: - from argo_ams_library import ArgoMessagingService, AmsMessage, AmsServiceException + from argo_ams_library import AmsMessage, ArgoMessagingService, AmsServiceException except ImportError: # ImportError is raised later on if AMS is requested but lib not installed. ArgoMessagingService = None @@ -494,8 +494,8 @@ def send_all(self): log_string = "Sent %s" % msgid elif self._protocol == Ssm2.AMS_MESSAGING: + # Then we are sending to an Argo Messaging Service instance. try: - # Then we are sending to an Argo Messaging Service instance. argo_id = self._send_msg_ams(text, msgid) log_string = "Sent %s, Argo ID: %s" % (msgid, argo_id) @@ -504,7 +504,7 @@ def send_all(self): if "Message size is too large" not in str(e): raise else: - log.warn('Message %s could not be sent as its larger than 1MB', msgid) + log.warning('Message %s could not be sent as its larger than 1MB', msgid) # Add the message to the rejected queue name = self._rejectq.add(text)