From a95cf27778686ff4b408031f316116637f70bc63 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Wed, 30 Aug 2023 17:43:42 -0700 Subject: [PATCH 1/4] CV2-3435 minor tweaks as per alegre audio replacement work --- lib/http.py | 7 ++++--- lib/queue/queue.py | 5 +++-- lib/queue/worker.py | 5 ++++- lib/schemas.py | 41 ++++++++++++++++++++++++----------------- run_worker.py | 2 +- 5 files changed, 36 insertions(+), 24 deletions(-) diff --git a/lib/http.py b/lib/http.py index 89b5320..6c9fc41 100644 --- a/lib/http.py +++ b/lib/http.py @@ -1,7 +1,7 @@ # from lib import schemas # from lib.queue.worker import QueueWorker -# queue = QueueWorker.create("mean_tokens__Model") -# queue.push_message("mean_tokens__Model", schemas.Message(body={"callback_url": "http://0.0.0.0:8000/echo", "id": 123, "text": "Some text to vectorize"})) +# queue = QueueWorker.create("audio__Model") +# queue.push_message("audio__Model", schemas.Message(body={'callback_url': 'http://alegre:3100/presto/receive/add_item/audio', 'id': 123, 'url':'http://devingaffney.com/files/blah.mp3', 'text': None, 'raw': {'doc_id': 123, 'url': 'http://devingaffney.com/files/blah.mp3'}})) import json import datetime from typing import Any, Dict @@ -33,9 +33,10 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]: @app.post("/process_item/{process_name}") def process_item(process_name: str, message: Dict[str, Any]): + logger.info(message) queue = QueueWorker.create(process_name) queue.push_message(process_name, schemas.Message(body=message)) - return {"message": "Message pushed successfully"} + return {"message": "Message pushed successfully", "queue": process_name, "body": message} @app.post("/trigger_callback") async def process_item(message: Dict[str, Any]): diff --git a/lib/queue/queue.py b/lib/queue/queue.py index 7ae14ea..af3478e 100644 --- a/lib/queue/queue.py +++ b/lib/queue/queue.py @@ -56,8 +56,9 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser """ try: found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)] - if found_queues: - return found_queues + exact_match_queues = [q for q in found_queues if q.attributes['QueueArn'].split(':')[-1] == queue_name] + if exact_match_queues: + return exact_match_queues else: return [self.create_queue(queue_name)] except botocore.exceptions.ClientError as e: diff --git a/lib/queue/worker.py b/lib/queue/worker.py index ca6187d..21c046e 100644 --- a/lib/queue/worker.py +++ b/lib/queue/worker.py @@ -51,7 +51,10 @@ def safely_respond(self, model: Model) -> List[schemas.Message]: responses = [] if messages_with_queues: logger.debug(f"About to respond to: ({messages_with_queues})") - responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues]) + try: + responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues]) + except Exception as e: + logger.error(e) self.delete_messages(messages_with_queues) return responses diff --git a/lib/schemas.py b/lib/schemas.py index 30ca05e..26bdc03 100644 --- a/lib/schemas.py +++ b/lib/schemas.py @@ -1,53 +1,60 @@ -from typing import Any, List, Union -from pydantic import BaseModel, HttpUrl +from typing import Any, List, Optional, Union +from pydantic import BaseModel # Output hash values can be of different types. HashValue = Union[List[float], str, int] class TextInput(BaseModel): id: str - callback_url: HttpUrl + callback_url: str text: str class TextOutput(BaseModel): id: str - callback_url: HttpUrl + callback_url: str text: str class VideoInput(BaseModel): id: str - callback_url: HttpUrl - url: HttpUrl + callback_url: str + url: str class VideoOutput(BaseModel): id: str - callback_url: HttpUrl - url: HttpUrl + callback_url: str + url: str bucket: str outfile: str hash_value: HashValue class AudioInput(BaseModel): id: str - callback_url: HttpUrl - url: HttpUrl + callback_url: str + url: str class AudioOutput(BaseModel): id: str - callback_url: HttpUrl - url: HttpUrl + callback_url: str + url: str hash_value: HashValue class ImageInput(BaseModel): id: str - callback_url: HttpUrl - url: HttpUrl + callback_url: str + url: str class ImageOutput(BaseModel): id: str - callback_url: HttpUrl - url: HttpUrl + callback_url: str + url: str hash_value: HashValue +class GenericInput(BaseModel): + id: str + callback_url: str + url: Optional[str] = None + text: Optional[str] = None + raw: Optional[dict] = {} + class Message(BaseModel): - body: Union[TextInput, VideoInput, AudioInput, ImageInput] + body: GenericInput response: Any \ No newline at end of file diff --git a/run_worker.py b/run_worker.py index 8a6f887..f4b7a27 100644 --- a/run_worker.py +++ b/run_worker.py @@ -10,4 +10,4 @@ logger.info("Beginning work loop...") while True: - queue.work(model) + queue.process(model) From 6511cbd9c1fd0513148d557df3e8254253e52891 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Thu, 31 Aug 2023 04:14:07 -0700 Subject: [PATCH 2/4] fix tests after refactors --- lib/queue/queue.py | 2 +- test/lib/model/test_audio.py | 6 ++++-- test/lib/model/test_fasttext.py | 24 +++++++++++++----------- test/lib/queue/test_queue.py | 4 +++- test/lib/test_http.py | 2 +- 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/lib/queue/queue.py b/lib/queue/queue.py index af3478e..99d1565 100644 --- a/lib/queue/queue.py +++ b/lib/queue/queue.py @@ -56,7 +56,7 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser """ try: found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)] - exact_match_queues = [q for q in found_queues if q.attributes['QueueArn'].split(':')[-1] == queue_name] + exact_match_queues = [queue for queue in found_queues if queue.attributes['QueueArn'].split(':')[-1] == queue_name] if exact_match_queues: return exact_match_queues else: diff --git a/test/lib/model/test_audio.py b/test/lib/model/test_audio.py index ab7aceb..c6359ba 100644 --- a/test/lib/model/test_audio.py +++ b/test/lib/model/test_audio.py @@ -6,14 +6,16 @@ from acoustid import FingerprintGenerationError from lib import schemas - +FINGERPRINT_RESPONSE = (170.6, b'AQAA3VFYJYrGJMj74EOZUCfCHGqYLBZO8UiX5bie47sCV0xwBTe49IiVHHrQnIImJyP-44rxI2cYiuiHMCMDPcqJrBcwnYeryBX6rccR_4Iy_YhfXESzqELJ5ASTLwhvNM94KDp9_IB_6NqDZ5I9_IWYvDiNCc1z8IeuHXkYpfhSg8su3M2K5lkrFM-PK3mQH8lznEpidLEoNAeLyWispQpqvfgRZjp0lHaENAmzBeamoRIZMrha5IsyHM6H7-jRhJlSBU1FLgiv4xlKUQmNptGOU3jzIj80Jk5xsQp0UegxJtmSCpeS5PiDozz0MAb5BG5z9MEPIcy0HeWD58M_4sotlNOF8UeuLJEgJt4xkUee4cflI1nMI4uciBLeGu9z9NjH4x9iSXoELYs04pqCSCvx5ei1Tzi3NMFRmsa2DD2POxVCR4IPMSfySC-u0EKuE6IOqz_6zJh8BzZlgc1IQkyTGdeLa4cT7bi2E30e_OgTI4xDPCGLJ_gvZHlwT7EgJc2XIBY_4fnBPENC_YilsGjDJzhJoeyCJn9A1kaeDUw4VA_-41uDGycO8w_eWlCU66iio0eYL8hVK_gD5QlyMR7hzzh-vDm6JE_hcTpq5cFTdFcKZfHxRMTZCS2VHKdOfDve5Hh0hCV9JEtMSbhxSSMuHU9y4kaTx5guHIGsoEAAwoASjmDlkSAEOCSoQEw4IDgghiguAEZAAMaAAYYAhBhACBEiiAGAIUCUUUgSESjgSBlKjZEEEIAFUEIBBRBinAAplFJKAIYQEAQSA4ywACkjgBFMAEoAQgYQwARB1gFmBCAECAAIMYYIoBxBBAAAFCKAAEgIBAQgAghgihIWBACEIUEIJEZIZIBRACGAGAEEIAGAUIBIhBCgRkI') class TestAudio(unittest.TestCase): def setUp(self): self.audio_model = Model() @patch('urllib.request.urlopen') @patch('urllib.request.Request') - def test_process_audio_success(self, mock_request, mock_urlopen): + @patch('acoustid.fingerprint_file') + def test_process_audio_success(self, mock_fingerprint_file, mock_request, mock_urlopen): + mock_fingerprint_file.return_value = FINGERPRINT_RESPONSE mock_request.return_value = mock_request # Use the `with` statement for proper file handling diff --git a/test/lib/model/test_fasttext.py b/test/lib/model/test_fasttext.py index a15946c..ba00176 100644 --- a/test/lib/model/test_fasttext.py +++ b/test/lib/model/test_fasttext.py @@ -1,25 +1,27 @@ -import os import unittest -from unittest.mock import MagicMock - -import numpy as np - +from unittest.mock import patch, MagicMock from lib.model.fasttext import FasttextModel from lib import schemas class TestFasttextModel(unittest.TestCase): def setUp(self): - self.model = FasttextModel() self.mock_model = MagicMock() - def test_respond(self): + @patch('lib.model.fasttext.hf_hub_download') + @patch('lib.model.fasttext.fasttext.load_model') + def test_respond(self, mock_fasttext_load_model, mock_hf_hub_download): + mock_hf_hub_download.return_value = 'mocked_path' + mock_fasttext_load_model.return_value = self.mock_model + self.mock_model.predict.return_value = (['__label__eng_Latn'], [0.9]) + + model = FasttextModel() # Now it uses mocked functions query = [schemas.Message(body=schemas.TextInput(id="123", callback_url="http://example.com/callback", text="Hello, how are you?")), schemas.Message(body=schemas.TextInput(id="123", callback_url="http://example.com/callback", text="今天是星期二"))] - response = self.model.respond(query) - + response = model.respond(query) + self.assertEqual(len(response), 2) - self.assertEqual(response[0].response, "__label__eng_Latn") - self.assertEqual(response[1].response, "__label__zho_Hans") + self.assertEqual(response[0].response, '__label__eng_Latn') + self.assertEqual(response[1].response, '__label__eng_Latn') # Mocked, so it will be the same if __name__ == '__main__': unittest.main() diff --git a/test/lib/queue/test_queue.py b/test/lib/queue/test_queue.py index 67b340d..ee0f536 100644 --- a/test/lib/queue/test_queue.py +++ b/test/lib/queue/test_queue.py @@ -22,8 +22,10 @@ def setUp(self, mock_get_env_setting, mock_boto_resource):#, mock_restrict_queue self.mock_sqs_resource = MagicMock() self.mock_input_queue = MagicMock() self.mock_input_queue.url = "http://queue/mean_tokens__Model" + self.mock_input_queue.attributes = {"QueueArn": "queue:mean_tokens__Model"} self.mock_output_queue = MagicMock() self.mock_output_queue.url = "http://queue/mean_tokens__Model_output" + self.mock_output_queue.attributes = {"QueueArn": "queue:mean_tokens__Model_output"} self.mock_sqs_resource.queues.filter.return_value = [self.mock_input_queue, self.mock_output_queue] mock_boto_resource.return_value = self.mock_sqs_resource @@ -102,7 +104,7 @@ def test_push_message(self): # Call push_message returned_message = self.queue.push_message(self.queue_name_output, message_to_push) # Check if the message was correctly serialized and sent - self.mock_output_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": "1", "callback_url": "http://example.com", "text": "This is a test"}, "response": null}') + self.mock_output_queue.send_message.assert_called_once_with(MessageBody='{"body": {"id": "1", "callback_url": "http://example.com", "url": null, "text": "This is a test", "raw": {}}, "response": null}') self.assertEqual(returned_message, message_to_push) if __name__ == '__main__': diff --git a/test/lib/test_http.py b/test/lib/test_http.py index d62be04..773d61f 100644 --- a/test/lib/test_http.py +++ b/test/lib/test_http.py @@ -20,7 +20,7 @@ def test_process_item(self, mock_push_message, mock_create): response = self.client.post("/process_item/test_process", json=test_data) mock_create.assert_called_once_with("test_process") self.assertEqual(response.status_code, 200) - self.assertEqual(response.json(), {"message": "Message pushed successfully"}) + self.assertEqual(response.json(), {"message": "Message pushed successfully", "queue": "test_process", "body": test_data}) @patch('lib.http.post_url') From d5bf1724e3dac005b742c8b5d2a01c83965bb01f Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Tue, 5 Sep 2023 08:40:17 -0700 Subject: [PATCH 3/4] add sentry as requirement for logging --- lib/http.py | 1 + lib/sentry.py | 9 +++++++++ requirements.txt | 3 ++- run_processor.py | 1 + run_worker.py | 1 + 5 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 lib/sentry.py diff --git a/lib/http.py b/lib/http.py index 6c9fc41..0707777 100644 --- a/lib/http.py +++ b/lib/http.py @@ -12,6 +12,7 @@ from lib.queue.worker import QueueWorker from lib.logger import logger from lib import schemas +from lib.sentry import sentry_sdk app = FastAPI() diff --git a/lib/sentry.py b/lib/sentry.py new file mode 100644 index 0000000..8804a1e --- /dev/null +++ b/lib/sentry.py @@ -0,0 +1,9 @@ +import os +import sentry_sdk +from lib.helpers import get_environment_setting + +sentry_sdk.init( + dsn=get_environment_setting('sentry_sdk_dsn'), + environment=get_environment_setting("DEPLOY_ENV"), + traces_sample_rate=1.0, +) diff --git a/requirements.txt b/requirements.txt index 867696e..946cb9c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ httpx==0.23.1 huggingface-hub==0.11.0 fasttext==0.9.2 requests==2.31.0 -pytest==7.4.0 \ No newline at end of file +pytest==7.4.0 +sentry-sdk==1.30.0 \ No newline at end of file diff --git a/run_processor.py b/run_processor.py index 960257c..e7c6ee9 100644 --- a/run_processor.py +++ b/run_processor.py @@ -4,6 +4,7 @@ from lib.queue.processor import QueueProcessor from lib.model.model import Model from lib.logger import logger +from lib.sentry import sentry_sdk queue = QueueProcessor.create() logger.info("Beginning callback loop...") diff --git a/run_worker.py b/run_worker.py index f4b7a27..fc7353e 100644 --- a/run_worker.py +++ b/run_worker.py @@ -4,6 +4,7 @@ from lib.queue.worker import QueueWorker from lib.model.model import Model from lib.logger import logger +from lib.sentry import sentry_sdk queue = QueueWorker.create() model = Model.create() From 6d107c0570a2a0eddd3d285116d8c00fade52f0b Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Tue, 12 Sep 2023 04:28:43 -0700 Subject: [PATCH 4/4] remove erroneous comments from build phase --- lib/http.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/http.py b/lib/http.py index 0707777..65070de 100644 --- a/lib/http.py +++ b/lib/http.py @@ -1,7 +1,3 @@ -# from lib import schemas -# from lib.queue.worker import QueueWorker -# queue = QueueWorker.create("audio__Model") -# queue.push_message("audio__Model", schemas.Message(body={'callback_url': 'http://alegre:3100/presto/receive/add_item/audio', 'id': 123, 'url':'http://devingaffney.com/files/blah.mp3', 'text': None, 'raw': {'doc_id': 123, 'url': 'http://devingaffney.com/files/blah.mp3'}})) import json import datetime from typing import Any, Dict