Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV2-3435 minor tweaks as per alegre audio replacement work #34

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib/http.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# from lib import schemas
# from lib.queue.worker import QueueWorker
# queue = QueueWorker.create("mean_tokens__Model")
# queue.push_message("mean_tokens__Model", schemas.Message(body={"callback_url": "http://0.0.0.0:8000/echo", "id": 123, "text": "Some text to vectorize"}))
# queue = QueueWorker.create("audio__Model")
# queue.push_message("audio__Model", schemas.Message(body={'callback_url': 'http://alegre:3100/presto/receive/add_item/audio', 'id': 123, 'url':'http://devingaffney.com/files/blah.mp3', 'text': None, 'raw': {'doc_id': 123, 'url': 'http://devingaffney.com/files/blah.mp3'}}))
import json
import datetime
from typing import Any, Dict
Expand All @@ -12,6 +12,7 @@
from lib.queue.worker import QueueWorker
from lib.logger import logger
from lib import schemas
from lib.sentry import sentry_sdk

app = FastAPI()

Expand All @@ -33,9 +34,10 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]:

@app.post("/process_item/{process_name}")
def process_item(process_name: str, message: Dict[str, Any]):
logger.info(message)
queue = QueueWorker.create(process_name)
queue.push_message(process_name, schemas.Message(body=message))
return {"message": "Message pushed successfully"}
return {"message": "Message pushed successfully", "queue": process_name, "body": message}

@app.post("/trigger_callback")
async def process_item(message: Dict[str, Any]):
Expand Down
5 changes: 3 additions & 2 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser
"""
try:
found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)]
if found_queues:
return found_queues
exact_match_queues = [queue for queue in found_queues if queue.attributes['QueueArn'].split(':')[-1] == queue_name]
if exact_match_queues:
return exact_match_queues
else:
return [self.create_queue(queue_name)]
except botocore.exceptions.ClientError as e:
Expand Down
5 changes: 4 additions & 1 deletion lib/queue/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def safely_respond(self, model: Model) -> List[schemas.Message]:
responses = []
if messages_with_queues:
logger.debug(f"About to respond to: ({messages_with_queues})")
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues])
try:
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get an error, log it and don't kill the worker. Should probably move the delete in here too so that we don't drop the message out of the queue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should only catch the specific exception that we are trying to handle here? And yes, we shouldn't delete the message unless it is sucessful. * this can lead to queue blocking tho, unless we have a "dead letter" process to take old items off the queue and stash them somewhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rescue everything, and log the error to sentry. The alternative is that we let the service die, and that will require a total reboot that will need to be done manually due to the other upstream constraints I am facing, namely that we run multiple pids in the same process, and this process is not the one that determines if the container is dead or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start by rescuing everything but reporting everything to Sentry... over time we can create more specific exception classes so we are able to group them more easily on Sentry. Also, I think that we should log only after all retries were exhausted and then move the job to a "dead queue" or something - if all those jobs failed because of a bug, we can retry them after pushing a fix, or discard. This is similar to how Sidekiq works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we handle all exceptions and leave service running on errors, it seems likely to me that we can deploy breaking changes (like malformed payload) and the service will be discarding data and apps that are submitting will be getting 'OK' callbacks?

I agree it is a trade off, because in scenarios where we have only one or two errors, logging errors is great and preferable to the whole service going down. But I'd rather have it crash a bunch in the beginning while we iron things out, rather than risk an overbroad exception handling that we are likely to forget to remove

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that for the service as a whole crashing is a good approach, but for individual jobs I think that is a good approach to handle them individually gracefully (retrying, capturing the error after all attempts were exhausted and reporting it to Sentry, send the job to a "dead queue", etc.), and the callback in that case should be an error sent to the client. Not sure if in the beginning we're going to have enough clarity and confidence to differentiate between these two cases.

except Exception as e:
logger.error(e)
self.delete_messages(messages_with_queues)
return responses

41 changes: 24 additions & 17 deletions lib/schemas.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,60 @@
from typing import Any, List, Union
from pydantic import BaseModel, HttpUrl
from typing import Any, List, Optional, Union
from pydantic import BaseModel

# Output hash values can be of different types.
HashValue = Union[List[float], str, int]
class TextInput(BaseModel):
id: str
callback_url: HttpUrl
callback_url: str
text: str

class TextOutput(BaseModel):
id: str
callback_url: HttpUrl
callback_url: str
text: str

class VideoInput(BaseModel):
id: str
callback_url: HttpUrl
url: HttpUrl
callback_url: str
url: str

class VideoOutput(BaseModel):
id: str
callback_url: HttpUrl
url: HttpUrl
callback_url: str
url: str
bucket: str
outfile: str
hash_value: HashValue

class AudioInput(BaseModel):
id: str
callback_url: HttpUrl
url: HttpUrl
callback_url: str
url: str

class AudioOutput(BaseModel):
id: str
callback_url: HttpUrl
url: HttpUrl
callback_url: str
url: str
hash_value: HashValue

class ImageInput(BaseModel):
id: str
callback_url: HttpUrl
url: HttpUrl
callback_url: str
url: str

class ImageOutput(BaseModel):
id: str
callback_url: HttpUrl
url: HttpUrl
callback_url: str
url: str
hash_value: HashValue

class GenericInput(BaseModel):
id: str
callback_url: str
url: Optional[str] = None
text: Optional[str] = None
raw: Optional[dict] = {}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a generic input for packing in other metadata as needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe needs to ensure that we don't load a raw payload bigger than sqs max payload size? And maybe if we have a raw dict, it needs to check against a schema? With queues, it is much better to know that the message is mis-constructed and fail when trying to put it on the queue then when trying to pull it off

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum payload size is 256k - that does indeed feel like it may be too small sometimes. I just grabbed some text off the cloudwatch messages from Alegre to grab a "typical" vector - that typical vector message is 16kb and is very likely nearly identical to the size of a singular output message. Still a little close for comfort being 1/16th the max size - particularly if the text is really long. I think a secondary ticket that pre-enforces a max payload size and perhaps optionally zips the message data may be useful, but I am not sure if this is the immediate next step or something that needs to be done in this ticket?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've had similar problems with Sidekiq before, related to the size of the payload... in general, it's not a good practice to serialize a full object or have a big payload for a background job, and it can also lead to performance issues that can affect the throughput of the queue. Does Presto have any kind of local storage? If so, is it possible for the job to contain only the job ID and get the data from the storage once it runs? If not, then yes, we should at least report an error to Sentry if the job is too big.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any storage mechanism on presto currently. I could see adding some ephemeral key/value store for precisely this sort of thing, at some point in the future, but I agree that right now logging when its too big, and designing to avoid that issue in the first place, is probably the best play for right now

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tipline messages are 4096 characters. Let's log and see how often we encounter size errors. We "should" be ok, but it's important we log and track this

class Message(BaseModel):
body: Union[TextInput, VideoInput, AudioInput, ImageInput]
body: GenericInput
response: Any
9 changes: 9 additions & 0 deletions lib/sentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os
import sentry_sdk
from lib.helpers import get_environment_setting

sentry_sdk.init(
dsn=get_environment_setting('sentry_sdk_dsn'),
environment=get_environment_setting("DEPLOY_ENV"),
traces_sample_rate=1.0,
)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ httpx==0.23.1
huggingface-hub==0.11.0
fasttext==0.9.2
requests==2.31.0
pytest==7.4.0
pytest==7.4.0
sentry-sdk==1.30.0
1 change: 1 addition & 0 deletions run_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from lib.queue.processor import QueueProcessor
from lib.model.model import Model
from lib.logger import logger
from lib.sentry import sentry_sdk
queue = QueueProcessor.create()

logger.info("Beginning callback loop...")
Expand Down
3 changes: 2 additions & 1 deletion run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from lib.queue.worker import QueueWorker
from lib.model.model import Model
from lib.logger import logger
from lib.sentry import sentry_sdk
queue = QueueWorker.create()

model = Model.create()

logger.info("Beginning work loop...")
while True:
queue.work(model)
queue.process(model)
6 changes: 4 additions & 2 deletions test/lib/model/test_audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from acoustid import FingerprintGenerationError

from lib import schemas

FINGERPRINT_RESPONSE = (170.6, b'AQAA3VFYJYrGJMj74EOZUCfCHGqYLBZO8UiX5bie47sCV0xwBTe49IiVHHrQnIImJyP-44rxI2cYiuiHMCMDPcqJrBcwnYeryBX6rccR_4Iy_YhfXESzqELJ5ASTLwhvNM94KDp9_IB_6NqDZ5I9_IWYvDiNCc1z8IeuHXkYpfhSg8su3M2K5lkrFM-PK3mQH8lznEpidLEoNAeLyWispQpqvfgRZjp0lHaENAmzBeamoRIZMrha5IsyHM6H7-jRhJlSBU1FLgiv4xlKUQmNptGOU3jzIj80Jk5xsQp0UegxJtmSCpeS5PiDozz0MAb5BG5z9MEPIcy0HeWD58M_4sotlNOF8UeuLJEgJt4xkUee4cflI1nMI4uciBLeGu9z9NjH4x9iSXoELYs04pqCSCvx5ei1Tzi3NMFRmsa2DD2POxVCR4IPMSfySC-u0EKuE6IOqz_6zJh8BzZlgc1IQkyTGdeLa4cT7bi2E30e_OgTI4xDPCGLJ_gvZHlwT7EgJc2XIBY_4fnBPENC_YilsGjDJzhJoeyCJn9A1kaeDUw4VA_-41uDGycO8w_eWlCU66iio0eYL8hVK_gD5QlyMR7hzzh-vDm6JE_hcTpq5cFTdFcKZfHxRMTZCS2VHKdOfDve5Hh0hCV9JEtMSbhxSSMuHU9y4kaTx5guHIGsoEAAwoASjmDlkSAEOCSoQEw4IDgghiguAEZAAMaAAYYAhBhACBEiiAGAIUCUUUgSESjgSBlKjZEEEIAFUEIBBRBinAAplFJKAIYQEAQSA4ywACkjgBFMAEoAQgYQwARB1gFmBCAECAAIMYYIoBxBBAAAFCKAAEgIBAQgAghgihIWBACEIUEIJEZIZIBRACGAGAEEIAGAUIBIhBCgRkI')
class TestAudio(unittest.TestCase):
def setUp(self):
self.audio_model = Model()

@patch('urllib.request.urlopen')
@patch('urllib.request.Request')
def test_process_audio_success(self, mock_request, mock_urlopen):
@patch('acoustid.fingerprint_file')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on fasttext - I'm mocking out model responses because these models take way too long locally.

def test_process_audio_success(self, mock_fingerprint_file, mock_request, mock_urlopen):
mock_fingerprint_file.return_value = FINGERPRINT_RESPONSE
mock_request.return_value = mock_request

# Use the `with` statement for proper file handling
Expand Down
24 changes: 13 additions & 11 deletions test/lib/model/test_fasttext.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import os
import unittest
from unittest.mock import MagicMock

import numpy as np

from unittest.mock import patch, MagicMock
from lib.model.fasttext import FasttextModel
from lib import schemas

class TestFasttextModel(unittest.TestCase):
def setUp(self):
self.model = FasttextModel()
self.mock_model = MagicMock()

def test_respond(self):
@patch('lib.model.fasttext.hf_hub_download')
@patch('lib.model.fasttext.fasttext.load_model')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that generally speaking, these unit tests shouldn't be downloading gigabytes-large models and running them - instead we should mock them with expected output and test that our code is working fine around them. Otherwise these tests take 20 minutes on my local machine, and the output from these models never changes over years of time. Would love to discuss this decision!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the existing system, I think they are cached 'locally' on an EFS volume or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is likey going to conflict with @amydunphy changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why I can't comment on your messages below here, but yes, this will clash and require @amydunphy to fix a few things. Yes, models will be cached locally, but the problem is also just loading them into memory and running them is expensive as well. I don't think that is the goal of a unit test. Agree that we should have integration tests that load these separately and confirm output still works over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, Devin... unit tests should not load big models. Previously, I used smaller models for unit tests. For example, in one of the very first versions of our similarity service, years ago (still using Word2Vec!), I used for unit tests a version of the Glove model that was 5 MB, while the production version was gigabytes. Is there a smaller version of the audio model that we could use for unit tests? If not, then yes, we should mock... at the end of the day, the tests are for the service, not the models.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not really a small version we can load for unit tests - I think we should open a separate ticket to add some other set of tests that check the models FWIW, but that should be done in a separate ticket

def test_respond(self, mock_fasttext_load_model, mock_hf_hub_download):
mock_hf_hub_download.return_value = 'mocked_path'
mock_fasttext_load_model.return_value = self.mock_model
self.mock_model.predict.return_value = (['__label__eng_Latn'], [0.9])

model = FasttextModel() # Now it uses mocked functions
query = [schemas.Message(body=schemas.TextInput(id="123", callback_url="http://example.com/callback", text="Hello, how are you?")), schemas.Message(body=schemas.TextInput(id="123", callback_url="http://example.com/callback", text="今天是星期二"))]

response = self.model.respond(query)
response = model.respond(query)

self.assertEqual(len(response), 2)
self.assertEqual(response[0].response, "__label__eng_Latn")
self.assertEqual(response[1].response, "__label__zho_Hans")
self.assertEqual(response[0].response, '__label__eng_Latn')
self.assertEqual(response[1].response, '__label__eng_Latn') # Mocked, so it will be the same

if __name__ == '__main__':
unittest.main()
4 changes: 3 additions & 1 deletion test/lib/queue/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def setUp(self, mock_get_env_setting, mock_boto_resource):#, mock_restrict_queue
self.mock_sqs_resource = MagicMock()
self.mock_input_queue = MagicMock()
self.mock_input_queue.url = "http://queue/mean_tokens__Model"
self.mock_input_queue.attributes = {"QueueArn": "queue:mean_tokens__Model"}
self.mock_output_queue = MagicMock()
self.mock_output_queue.url = "http://queue/mean_tokens__Model_output"
self.mock_output_queue.attributes = {"QueueArn": "queue:mean_tokens__Model_output"}
self.mock_sqs_resource.queues.filter.return_value = [self.mock_input_queue, self.mock_output_queue]
mock_boto_resource.return_value = self.mock_sqs_resource

Expand Down Expand Up @@ -102,7 +104,7 @@ def test_push_message(self):
# Call push_message
returned_message = self.queue.push_message(self.queue_name_output, message_to_push)
# Check if the message was correctly serialized and sent
self.mock_output_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": "1", "callback_url": "http://example.com", "text": "This is a test"}, "response": null}')
self.mock_output_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": "1", "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}}, "response": null}')
self.assertEqual(returned_message, message_to_push)

if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion test/lib/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_process_item(self, mock_push_message, mock_create):
response = self.client.post("/process_item/test_process", json=test_data)
mock_create.assert_called_once_with("test_process")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), {"message": "Message pushed successfully"})
self.assertEqual(response.json(), {"message": "Message pushed successfully", "queue": "test_process", "body": test_data})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to reflect actual output



@patch('lib.http.post_url')
Expand Down
Loading