Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

156 1mb error handling #179

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions conf/sender.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ ams_project: accounting
# Queue to which SSM will send messages
destination: gLite-APEL

# Outgoing messages will be read and removed from this directory.
path: /var/spool/apel/outgoing
# Accepted messages will be read and removed from <path>/outgoing
# Rejected messages will be moved to <path>/reject
path: /var/spool/apel
# If 'path_type' is set to 'dirq' (or if 'path_type' is omitted), the supplied
# 'path' will be treated as a Python dirq (a directory based queue, which is a
# port of the Perl module Directory::Queue).
Expand Down
35 changes: 29 additions & 6 deletions ssm/ssm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from logging import getLogger, INFO, WARNING, DEBUG

try:
from argo_ams_library import ArgoMessagingService, AmsMessage
from argo_ams_library import AmsMessage, ArgoMessagingService, AmsServiceException
except ImportError:
# ImportError is raised later on if AMS is requested but lib not installed.
ArgoMessagingService = None
Expand Down Expand Up @@ -113,16 +113,21 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None,

# create the filesystem queues for accepted and rejected messages
if dest is not None and listen is None:
outqpath = os.path.join(qpath, 'outgoing')
rejectqpath = os.path.join(qpath, 'reject')

# Determine what sort of outgoing structure to make
if path_type == 'dirq':
if QueueSimple is None:
raise ImportError("dirq path_type requested but the dirq "
"module wasn't found.")

self._outq = QueueSimple(qpath)
self._outq = QueueSimple(outqpath)
self._rejectq = QueueSimple(rejectqpath)

elif path_type == 'directory':
self._outq = MessageDirectory(qpath)
self._outq = MessageDirectory(outqpath)
self._rejectq = MessageDirectory(rejectqpath)
else:
raise Ssm2Exception('Unsupported path_type variable.')

Expand Down Expand Up @@ -490,9 +495,27 @@ def send_all(self):

elif self._protocol == Ssm2.AMS_MESSAGING:
# Then we are sending to an Argo Messaging Service instance.
argo_id = self._send_msg_ams(text, msgid)

log_string = "Sent %s, Argo ID: %s" % (msgid, argo_id)
try:
argo_id = self._send_msg_ams(text, msgid)
log_string = "Sent %s, Argo ID: %s" % (msgid, argo_id)

except AmsServiceException as e:
# Catch specific 'message too large' exception, raise otherwise.
if "Message size is too large" not in str(e):
raise
else:
log.warning('Message %s could not be sent as its larger than 1MB', msgid)

# Add the message to the rejected queue
name = self._rejectq.add(text)
log.info("Message %s saved to reject queue as %s", msgid, name)

# Remove the message from the outgoing queue
self._last_msg = None
self._outq.remove(msgid)

# Exit out of loop iteration so that message is not removed.
continue

else:
# The SSM has been improperly configured
Expand Down