Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV2-3435 minor tweaks as per alegre audio replacement work #34

Merged
merged 4 commits into from
Sep 12, 2023

Conversation

DGaffney
Copy link
Collaborator

@DGaffney DGaffney commented Aug 31, 2023

Mostly small changes due to errors encountered while actually testing end to end with alegre. Check out my inline comments - the high level is that:

  1. We needed to move around the tests, body expectations for queue pushing requests,
  2. While I was in here I mocked out a bunch of outputs from models so that we aren't just waiting on model responses for 20 minutes when running local tests.

@DGaffney DGaffney marked this pull request as draft August 31, 2023 00:44
@@ -56,8 +56,9 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser
"""
try:
found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)]
if found_queues:
return found_queues
exact_match_queues = [q for q in found_queues if q.attributes['QueueArn'].split(':')[-1] == queue_name]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening because apparently, .filter only works on prefixes, so I was getting this error where the output queue existed and was found first, so the worker was just returning the wrong queue over and over instead of forcing the creation of the input queue.

@@ -51,7 +51,10 @@ def safely_respond(self, model: Model) -> List[schemas.Message]:
responses = []
if messages_with_queues:
logger.debug(f"About to respond to: ({messages_with_queues})")
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues])
try:
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get an error, log it and don't kill the worker. Should probably move the delete in here too so that we don't drop the message out of the queue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should only catch the specific exception that we are trying to handle here? And yes, we shouldn't delete the message unless it is sucessful. * this can lead to queue blocking tho, unless we have a "dead letter" process to take old items off the queue and stash them somewhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rescue everything, and log the error to sentry. The alternative is that we let the service die, and that will require a total reboot that will need to be done manually due to the other upstream constraints I am facing, namely that we run multiple pids in the same process, and this process is not the one that determines if the container is dead or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start by rescuing everything but reporting everything to Sentry... over time we can create more specific exception classes so we are able to group them more easily on Sentry. Also, I think that we should log only after all retries were exhausted and then move the job to a "dead queue" or something - if all those jobs failed because of a bug, we can retry them after pushing a fix, or discard. This is similar to how Sidekiq works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we handle all exceptions and leave service running on errors, it seems likely to me that we can deploy breaking changes (like malformed payload) and the service will be discarding data and apps that are submitting will be getting 'OK' callbacks?

I agree it is a trade off, because in scenarios where we have only one or two errors, logging errors is great and preferable to the whole service going down. But I'd rather have it crash a bunch in the beginning while we iron things out, rather than risk an overbroad exception handling that we are likely to forget to remove

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that for the service as a whole crashing is a good approach, but for individual jobs I think that is a good approach to handle them individually gracefully (retrying, capturing the error after all attempts were exhausted and reporting it to Sentry, send the job to a "dead queue", etc.), and the callback in that case should be an error sent to the client. Not sure if in the beginning we're going to have enough clarity and confidence to differentiate between these two cases.

callback_url: str
url: Optional[str] = None
text: Optional[str] = None
raw: Optional[dict] = {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a generic input for packing in other metadata as needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe needs to ensure that we don't load a raw payload bigger than sqs max payload size? And maybe if we have a raw dict, it needs to check against a schema? With queues, it is much better to know that the message is mis-constructed and fail when trying to put it on the queue then when trying to pull it off

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum payload size is 256k - that does indeed feel like it may be too small sometimes. I just grabbed some text off the cloudwatch messages from Alegre to grab a "typical" vector - that typical vector message is 16kb and is very likely nearly identical to the size of a singular output message. Still a little close for comfort being 1/16th the max size - particularly if the text is really long. I think a secondary ticket that pre-enforces a max payload size and perhaps optionally zips the message data may be useful, but I am not sure if this is the immediate next step or something that needs to be done in this ticket?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've had similar problems with Sidekiq before, related to the size of the payload... in general, it's not a good practice to serialize a full object or have a big payload for a background job, and it can also lead to performance issues that can affect the throughput of the queue. Does Presto have any kind of local storage? If so, is it possible for the job to contain only the job ID and get the data from the storage once it runs? If not, then yes, we should at least report an error to Sentry if the job is too big.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any storage mechanism on presto currently. I could see adding some ephemeral key/value store for precisely this sort of thing, at some point in the future, but I agree that right now logging when its too big, and designing to avoid that issue in the first place, is probably the best play for right now

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tipline messages are 4096 characters. Let's log and see how often we encounter size errors. We "should" be ok, but it's important we log and track this

@@ -20,7 +20,7 @@ def test_process_item(self, mock_push_message, mock_create):
response = self.client.post("/process_item/test_process", json=test_data)
mock_create.assert_called_once_with("test_process")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), {"message": "Message pushed successfully"})
self.assertEqual(response.json(), {"message": "Message pushed successfully", "queue": "test_process", "body": test_data})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to reflect actual output

self.mock_model = MagicMock()

def test_respond(self):
@patch('lib.model.fasttext.hf_hub_download')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that generally speaking, these unit tests shouldn't be downloading gigabytes-large models and running them - instead we should mock them with expected output and test that our code is working fine around them. Otherwise these tests take 20 minutes on my local machine, and the output from these models never changes over years of time. Would love to discuss this decision!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the existing system, I think they are cached 'locally' on an EFS volume or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is likey going to conflict with @amydunphy changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why I can't comment on your messages below here, but yes, this will clash and require @amydunphy to fix a few things. Yes, models will be cached locally, but the problem is also just loading them into memory and running them is expensive as well. I don't think that is the goal of a unit test. Agree that we should have integration tests that load these separately and confirm output still works over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, Devin... unit tests should not load big models. Previously, I used smaller models for unit tests. For example, in one of the very first versions of our similarity service, years ago (still using Word2Vec!), I used for unit tests a version of the Glove model that was 5 MB, while the production version was gigabytes. Is there a smaller version of the audio model that we could use for unit tests? If not, then yes, we should mock... at the end of the day, the tests are for the service, not the models.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not really a small version we can load for unit tests - I think we should open a separate ticket to add some other set of tests that check the models FWIW, but that should be done in a separate ticket

class TestAudio(unittest.TestCase):
def setUp(self):
self.audio_model = Model()

@patch('urllib.request.urlopen')
@patch('urllib.request.Request')
def test_process_audio_success(self, mock_request, mock_urlopen):
@patch('acoustid.fingerprint_file')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on fasttext - I'm mocking out model responses because these models take way too long locally.

@skyemeedan
Copy link
Contributor

mocked out a bunch of outputs from models so that we aren't just waiting on model responses for 20 minutes when running local tests.

Can you create a ticket for the integration tests (run seldomly) that do excersie the models?

@@ -51,7 +51,10 @@ def safely_respond(self, model: Model) -> List[schemas.Message]:
responses = []
if messages_with_queues:
logger.debug(f"About to respond to: ({messages_with_queues})")
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues])
try:
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should only catch the specific exception that we are trying to handle here? And yes, we shouldn't delete the message unless it is sucessful. * this can lead to queue blocking tho, unless we have a "dead letter" process to take old items off the queue and stash them somewhere

callback_url: str
url: Optional[str] = None
text: Optional[str] = None
raw: Optional[dict] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe needs to ensure that we don't load a raw payload bigger than sqs max payload size? And maybe if we have a raw dict, it needs to check against a schema? With queues, it is much better to know that the message is mis-constructed and fail when trying to put it on the queue then when trying to pull it off

self.mock_model = MagicMock()

def test_respond(self):
@patch('lib.model.fasttext.hf_hub_download')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is likey going to conflict with @amydunphy changes?

@DGaffney DGaffney marked this pull request as ready for review September 1, 2023 14:23
@DGaffney DGaffney merged commit 6809ca0 into master Sep 12, 2023
2 checks passed
@DGaffney DGaffney deleted the cv2-3435-add-presto-to-alegre branch September 12, 2023 13:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants