From 1cd1a6533c987af83307e5e882000d832dd339f2 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Fri, 18 Aug 2023 13:15:53 -0700 Subject: [PATCH 1/3] CV2-3551 add elasticmq, remove redis, redo tests, update message passing now that sqs style is working throughout --- Dockerfile | 3 +- README.md | 10 +- docker-compose.yml | 14 ++- lib/__pycache__/__init__.cpython-39.pyc | Bin 137 -> 0 bytes lib/__pycache__/s3.cpython-39.pyc | Bin 574 -> 0 bytes lib/http.py | 9 +- lib/model/__pycache__/__init__.cpython-39.pyc | Bin 143 -> 0 bytes .../generic_transformer.cpython-39.pyc | Bin 1250 -> 0 bytes lib/model/generic_transformer.py | 1 + lib/queue/__pycache__/__init__.cpython-39.pyc | Bin 143 -> 0 bytes .../__pycache__/sqs_queue.cpython-39.pyc | Bin 1242 -> 0 bytes lib/queue/queue.py | 103 +++++++++++++----- lib/queue/redis_queue.py | 47 -------- lib/queue/sqs_queue.py | 32 ------ local.env | 3 +- requirements.txt | 1 - run.py | 4 +- test/__pycache__/__init__.cpython-39.pyc | Bin 271 -> 0 bytes test/lib/__pycache__/__init__.cpython-39.pyc | Bin 142 -> 0 bytes test/lib/__pycache__/test_s3.cpython-39.pyc | Bin 1537 -> 0 bytes .../model/__pycache__/__init__.cpython-39.pyc | Bin 148 -> 0 bytes .../__pycache__/test_audio.cpython-39.pyc | Bin 2122 -> 0 bytes .../__pycache__/test_fptg.cpython-39.pyc | Bin 1563 -> 0 bytes .../__pycache__/test_generic.cpython-39.pyc | Bin 1594 -> 0 bytes .../test_indian_sbert.cpython-39.pyc | Bin 1562 -> 0 bytes .../test_meantokens.cpython-39.pyc | Bin 1633 -> 0 bytes .../__pycache__/test_model.cpython-39.pyc | Bin 641 -> 0 bytes .../__pycache__/test_video.cpython-39.pyc | Bin 3244 -> 0 bytes .../queue/__pycache__/__init__.cpython-39.pyc | Bin 148 -> 0 bytes .../__pycache__/test_queue.cpython-39.pyc | Bin 1784 -> 0 bytes .../__pycache__/test_sqs_queue.cpython-39.pyc | Bin 1619 -> 0 bytes test/lib/queue/test_queue.py | 71 ++++++++++-- test/lib/queue/test_redis_queue.py | 24 ---- test/lib/queue/test_sqs_queue.py | 64 ----------- test/lib/test_http.py | 6 +- 35 files changed, 167 insertions(+), 225 deletions(-) delete mode 100644 lib/__pycache__/__init__.cpython-39.pyc delete mode 100644 lib/__pycache__/s3.cpython-39.pyc delete mode 100644 lib/model/__pycache__/__init__.cpython-39.pyc delete mode 100644 lib/model/__pycache__/generic_transformer.cpython-39.pyc delete mode 100644 lib/queue/__pycache__/__init__.cpython-39.pyc delete mode 100644 lib/queue/__pycache__/sqs_queue.cpython-39.pyc delete mode 100644 lib/queue/redis_queue.py delete mode 100644 test/__pycache__/__init__.cpython-39.pyc delete mode 100644 test/lib/__pycache__/__init__.cpython-39.pyc delete mode 100644 test/lib/__pycache__/test_s3.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/__init__.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/test_audio.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/test_fptg.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/test_generic.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/test_indian_sbert.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/test_meantokens.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/test_model.cpython-39.pyc delete mode 100644 test/lib/model/__pycache__/test_video.cpython-39.pyc delete mode 100644 test/lib/queue/__pycache__/__init__.cpython-39.pyc delete mode 100644 test/lib/queue/__pycache__/test_queue.cpython-39.pyc delete mode 100644 test/lib/queue/__pycache__/test_sqs_queue.cpython-39.pyc delete mode 100644 test/lib/queue/test_redis_queue.py delete mode 100644 test/lib/queue/test_sqs_queue.py diff --git a/Dockerfile b/Dockerfile index 6c00d80..40972b2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y ffmpeg cmake swig libavcodec-dev libavformat-dev git RUN ln -s /usr/bin/ffmpeg /usr/local/bin/ffmpeg -COPY . . +COPY ./threatexchange /app/threatexchange RUN make -C /app/threatexchange/tmk/cpp RUN git clone https://github.com/meedan/chromaprint.git RUN cd chromaprint && cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TOOLS=ON . @@ -25,4 +25,5 @@ RUN pip install transformers RUN pip install pact-python RUN pip install --no-cache-dir -r requirements.txt RUN cd threatexchange/pdq/python && pip install . +COPY . . CMD ["make", "run"] \ No newline at end of file diff --git a/README.md b/README.md index 1a186c3..49a4db7 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,13 @@ Presto is a Python service that aims to perform, most generally, on-demand media fingerprints at scale. In the context of text, fingerprints are transformer vectors - video is done by TMK, images by PDQ, and audio by chromaprint. -Presto performs text vectorization using different Hugging Face models. The texts are enqueued via a generic Queue class, which can either be a Redis (local) or SQS (production) instance. The project's directory structure consists of two main directories, `model/` and `queue/`, which contain classes for different models and queues, respectively. The `test/` directory contains test classes for the different models and queues. Audio, Image, and Video fingerprinting are accomplished by specific packages aimed at those tasks. Text generates lists of floats as output (i.e. vectors), while Audio, Image, And Video generate string represented bitfields or hashes. Video additionally generates .tmk files which are ≈250kb files typically (though the file can technically grow as the video length grows). +Presto performs text vectorization using different Hugging Face models. The texts are enqueued via a generic Queue class, which can either be a ElasticMQ (local) or SQS (production) instance. The project's directory structure consists of two main directories, `model/` and `queue/`, which contain classes for different models and queues, respectively. The `test/` directory contains test classes for the different models and queues. Audio, Image, and Video fingerprinting are accomplished by specific packages aimed at those tasks. Text generates lists of floats as output (i.e. vectors), while Audio, Image, And Video generate string represented bitfields or hashes. Video additionally generates .tmk files which are ≈250kb files typically (though the file can technically grow as the video length grows). ### Dependencies This project requires the following dependencies, which are listed in the `requirements.txt` file: - boto3==1.18.64 - pyacoustid==1.2.2 -- redis==4.4.4 - sentence-transformers==2.2.0 - tmkpy==0.1.1 - torch==1.9.0 @@ -35,15 +34,14 @@ To run the project, you can use the provided `Dockerfile`, or start via `docker- ``` docker build -t text-vectorization . -docker run -e QUEUE_TYPE= -e INPUT_QUEUE_NAME= -e OUTPUT_QUEUE_NAME= -e MODEL_NAME= +docker run -e -e INPUT_QUEUE_NAME= -e OUTPUT_QUEUE_NAME= -e MODEL_NAME= ``` -Here, we require at least three environment variables - `queue_type`, `input_queue_name`, and `model_name`. If left unspecified, `output_queue_name` will be automatically set to `input_queue_name[-output]`. Depending on your usage, you may need to replace ``, ``, ``, and `` with the appropriate values. +Here, we require at least two environment variables - `input_queue_name`, and `model_name`. If left unspecified, `output_queue_name` will be automatically set to `input_queue_name[-output]`. Depending on your usage, you may need to replace ``, ``, and `` with the appropriate values. Currently supported `queue_name` values are just module names keyed from the `queue` directory, and currently are as follows: * `sqs_queue.SQSQueue` - SQS-Based -* `redis_queue.RedisQueue` - Redis-Based Currently supported `model_name` values are just module names keyed from the `model` directory, and currently are as follows: @@ -65,7 +63,7 @@ The `main.py` file is the HTTP server. We use FastAPI and provide two endpoints, ### Queues -Presto is able to `process_messages` via redis or SQS. In practice, we use redis for local development, and SQS for production environments. When interacting with a `queue`, we use the generic superclass `queue`. `queue.fingerprint` takes as an argument a `model` instance. The `fingerprint` routine collects a batch of `BATCH_SIZE` messages appropriate to the `BATCH_SIZE` for the `model` specified. Once pulled from the `input_queue`, those messages are processed via `model.respond`. The resulting fingerprint outputs from the model are then zipped with the original message pulled from the `input_queue`, and a message is placed onto the `output_queue` that consists of exactly: `{"request": message, "response": response}`. +Presto is able to `process_messages` via ElasticMQ or SQS. In practice, we use ElasticMQ for local development, and SQS for production environments. When interacting with a `queue`, we use the generic superclass `queue`. `queue.fingerprint` takes as an argument a `model` instance. The `fingerprint` routine collects a batch of `BATCH_SIZE` messages appropriate to the `BATCH_SIZE` for the `model` specified. Once pulled from the `input_queue`, those messages are processed via `model.respond`. The resulting fingerprint outputs from the model are then zipped with the original message pulled from the `input_queue`, and a message is placed onto the `output_queue` that consists of exactly: `{"request": message, "response": response}`. ### Models diff --git a/docker-compose.yml b/docker-compose.yml index fa8d576..1ae1eaf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,5 @@ version: '3.9' + services: app: platform: linux/amd64 @@ -6,8 +7,13 @@ services: env_file: - ./local.env depends_on: - - redis + - elasticmq links: - - redis - redis: - image: redis + - elasticmq + volumes: + - ./:/app + elasticmq: + image: softwaremill/elasticmq + hostname: presto-elasticmq + ports: + - "9324:9324" \ No newline at end of file diff --git a/lib/__pycache__/__init__.cpython-39.pyc b/lib/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 8103902a88c718136f2c051e1dcb2461334ac2d3..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 137 zcmYe~<>g`kf}E$ODIoeWh(HF6K#l_t7qb9~6oz01O-8?!3`HPe1o2BxKeRZts8~NG zJuxj!-#I@eRX;f+H91?qpeVJtBws%#Gf6)_J~J<~BtBlRpz;=nO>TZlX-=vg$ehnW G%m4sL6doY} diff --git a/lib/__pycache__/s3.cpython-39.pyc b/lib/__pycache__/s3.cpython-39.pyc deleted file mode 100644 index 4161704b92ce6bcf3161457a54d25c34d70ac7c5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 574 zcmY+C&1%~~5XWafskGYsq>Wh$f*R9A#E3_+!g?`fHr@TTg?W~o8J20c&|NM4l*-?IQFb1^m;+6CS;CI;U8^z5#bbE|Jf+QN| zAgN?OVNN8O!Ia-&1};P(|5=2KQ1=|&USn{uL3&Hpv?CiT>6S?L4A!iD(a{a-NCzF; zG6IfE-mx2y(JlKy2z-L_H3t{1Y65FcppB6j?1)DlzkyrcQ=jo}YyORwNlV zh8LdN^qwSlf6&_JvqlJI?XuB&6*rYOxl}S<7FxyaBz7jArSYN>m&%>APe0_S%b@n; z19$izcNR|vK3W)OQa=#7P^I&ct#sl1a!j#j<5BgSq7>F{z zQuC2bvxg$1AZ9`JME!#L5a03sW%ALg+9vW#zFa2fMyf=7RpK(KYGs{Cbg@WmdQz?Y z(O&b+narjvOk;up=$K9@B@=RtULT<{rF9tEZoTWY7oK3k58qrF*=Y3^{}Qla*hBgk D%A%BC diff --git a/lib/http.py b/lib/http.py index 0674423..8c11deb 100644 --- a/lib/http.py +++ b/lib/http.py @@ -1,3 +1,8 @@ +# import datetime +# from lib.queue.queue import Queue +# queue = Queue.create("input", "output") +# queue.push_message(queue.input_queue, {"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"}, "input_queue": queue.input_queue_name, "output_queue": queue.output_queue_name, "start_time": str(datetime.datetime.now())}) +# import json import datetime from typing import Any, Dict @@ -7,7 +12,7 @@ from pydantic import BaseModel from lib.queue.queue import Queue from lib.logger import logger - +from lib import schemas app = FastAPI() @app.middleware("http") @@ -29,7 +34,7 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]: @app.post("/fingerprint_item/{fingerprinter}") def fingerprint_item(fingerprinter: str, message: Dict[str, Any]): queue = Queue.create(fingerprinter, f"{fingerprinter}-output") - queue.push_message(queue.input_queue_name, {"body": message, "input_queue": queue.input_queue_name, "output_queue": queue.output_queue_name, "start_time": str(datetime.datetime.now())}) + queue.push_message(queue.input_queue_name, schemas.Message(body=message, input_queue=queue.input_queue_name, output_queue=queue.output_queue_name, start_time=str(datetime.datetime.now()))) return {"message": "Message pushed successfully"} @app.post("/trigger_callback") diff --git a/lib/model/__pycache__/__init__.cpython-39.pyc b/lib/model/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 1862c712de86c7b43d768e8154377ed8981aeafd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 143 zcmYe~<>g`kf}E$ODIoeWh(HF6K#l_t7qb9~6oz01O-8?!3`HPe1o6v6KeRZts8~NG zJuxj!-#I@eRX;f+H91?qpeVJtBws%#Gf6)eD4L@mAD@|*SrQ+wS5SG2!v-i;nv-e= KGVL=EGXMY+>L6zT diff --git a/lib/model/__pycache__/generic_transformer.cpython-39.pyc b/lib/model/__pycache__/generic_transformer.cpython-39.pyc deleted file mode 100644 index e172b96fddf0c5b28dc2d0792d3c96c615903e86..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1250 zcmah}y>1gh5Z>AQiDQxwC4|rc63t1-hd`pT0#YJ{1~-Q0M6|lxZp^{?&e^>nu+Ek9 z0(6vinfKr+wxyz`qhjVR1``pn(u`+zckY|{cE<7gdYj?ue0?gvaK^rov%Fk%o}-wD zs3en+u^~hNWb=#0^sU zHpxDUxr0iwiV0SMzEwg6Qy@>WKD&Tkz^yl=^fX1F=Ngrr*mk`q@_48&m|n%BpU=YA zMrjkuL40x&?qf`tyj98jaHN$fi!e)%!#XM)G&zd$4-1{qC)*Mt&C^l{kMiMR11G8<%?buEfri2g4%ieXykaw6@i|PK(y5?wru={% zZNA|Xcj8rE6^o6e(GWvShrLk zOZ;^PszeHT=aB_fv(_kTJlpPOsVS3UINDoB{|5>iKe?`SWFVFAJ(M;B`%U7tgL_sdsR18^zQ;1LOPGW%ax2Uf@%2Qm+eP+d^O$ zpCLQKt_g8^7H7?gzCma-PBq37E{%sKRy6e`N!{HoXnC#GdaoCF(T=(Nb6B1WQ;(g! fiNn*Who{k+sHr3W1!G&?CdEB!S3`hnd<}j9)iy1V diff --git a/lib/model/generic_transformer.py b/lib/model/generic_transformer.py index 5fb88b6..9b40769 100644 --- a/lib/model/generic_transformer.py +++ b/lib/model/generic_transformer.py @@ -21,6 +21,7 @@ def respond(self, docs: Union[List[schemas.Message], schemas.Message]) -> List[s """ if not isinstance(docs, list): docs = [docs] + print(docs) vectorizable_texts = [e.body.text for e in docs] vectorized = self.vectorize(vectorizable_texts) for doc, vector in zip(docs, vectorized): diff --git a/lib/queue/__pycache__/__init__.cpython-39.pyc b/lib/queue/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 6cf697d9dfa3abb97dede252f94c3301417a126f..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 143 zcmYe~<>g`kf}E$ODIoeWh(HF6K#l_t7qb9~6oz01O-8?!3`HPe1o6v6KeRZts8~NG zJuxj!-#I@eRX;f+H91?qpeVJtBws%#GfBU&G_^EUKR!M)FS8^*Uaz3?7Kcr4eoARh MsvXF*&p^xo02ZDgdjJ3c diff --git a/lib/queue/__pycache__/sqs_queue.cpython-39.pyc b/lib/queue/__pycache__/sqs_queue.cpython-39.pyc deleted file mode 100644 index 23d74b874456b882dc0ae0339616ad44e4db09f7..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1242 zcmZXTy>8nu5XVVMvXw+`?WRSMp?kGNLqDdX2%4c&Tm(TEErcx6iCx)pNU3N7bxMFt zc?WwbkSFN7a4S0X6*~2fl3b@Hz!Q%=-Tip_E9v#N35;*wUyC1%ke}#m4i}sQnEE*w zK?Kc6j8B%)8H*Vu?}>1PdrgEZ*_9JJf}W7j`vuWt7*gXMm$H;8x|_pk9Q_g&lEhSy znB9;u6;!ZmAJW*Qu`4{$xhAnEy26KN=Z3f>?m~$zgr9u=lIF$uI$0qR61)syaW@j>Al+C6I^0HWap=(?% zv$5%Kgw!t$Hsk88?NL&sXIxJ%LUn<-15^882<_7V=H;Mm^nQESl;U|aKsL79hw#*o!9W+u z6#6-3g5J`v(0k}212x3EN6D9AIU7m!VSFUDPEMr`9kl~5nZA;#oP3si)*PrlY==zs z@YG-L?t-hV!CLAGm;k1>B0$`|HSWI|Q$%eCu5nsy%uvxKSrH2A23(5b?FgC4LbeRu zhM}&$MlcWRuofJ#wXH-ipRQT%$NWcva z53;FeNYiUXn93~Ue53jRL+l5`o8k{@HRjWxdrg_b?#er)>dINZNx^x SzwP(m)PQeMlvV~l3)mkn6DR@z diff --git a/lib/queue/queue.py b/lib/queue/queue.py index 5d9b97f..568d642 100644 --- a/lib/queue/queue.py +++ b/lib/queue/queue.py @@ -1,29 +1,58 @@ +import json from typing import Any, List, Dict, Tuple, Union -import copy import os -from abc import ABC -from lib.helpers import get_class, get_setting +import boto3 +import botocore + +from lib.helpers import get_class, get_setting, get_environment_setting from lib.model.model import Model from lib.logger import logger - -class Queue(ABC): +from lib import schemas +class Queue: @classmethod - def create(cls, input_queue_name: str = None, output_queue_name: str = None, queue_driver_name: str = None, batch_size: int = None): + def create(cls, input_queue_name: str = None, output_queue_name: str = None, batch_size: int = 10): """ Instantiate a queue. Must pass queue_driver_name (i.e. sqs_queue.SQSQueue vs redis_queue.RedisQueue), input_queue_name, output_queue_name, and batch_size. Pulls settings and then inits instance. """ - logger.info(f"Starting queue with: ({input_queue_name}, {output_queue_name}, {queue_driver_name}, {batch_size})") input_queue_name = get_setting(input_queue_name, "INPUT_QUEUE_NAME") output_queue_name = get_setting(output_queue_name, "OUTPUT_QUEUE_NAME") - return get_class('lib.queue.', get_setting(queue_driver_name, "QUEUE_TYPE"))(input_queue_name, output_queue_name, batch_size) + logger.info(f"Starting queue with: ('{input_queue_name}', '{output_queue_name}', {batch_size})") + return Queue(input_queue_name, output_queue_name, batch_size) + + def get_or_create_queue(self, queue_name): + try: + return self.sqs.get_queue_by_name(QueueName=queue_name) + except botocore.exceptions.ClientError as e: + logger.info(f"Queue {queue_name} doesn't exist - creating") + if e.response['Error']['Code'] == "AWS.SimpleQueueService.NonExistentQueue": + return self.sqs.create_queue(QueueName=queue_name) + else: + raise + + def get_sqs(self): + presto_env = get_environment_setting("PRESTO_ENV") + if presto_env == "local": + logger.info(f"Using ElasticMQ Interface") + return boto3.resource('sqs', + region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'), + endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'), + aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'), + aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x')) + else: + logger.info(f"Using SQS Interface") + return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION")) def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1): """ Start a specific queue - must pass input_queue_name - optionally pass output_queue_name, batch_size. """ + self.sqs = self.get_sqs() + self.input_queue = self.get_or_create_queue(input_queue_name) + self.output_queue = self.get_or_create_queue(output_queue_name) self.input_queue_name = input_queue_name + self.batch_size = batch_size self.output_queue_name = self.get_output_queue_name(input_queue_name, output_queue_name) def get_output_queue_name(self, input_queue_name: str, output_queue_name: str = None): @@ -34,6 +63,16 @@ def get_output_queue_name(self, input_queue_name: str, output_queue_name: str = output_queue_name = f'{input_queue_name}-output' return output_queue_name + def delete_messages(self, queue, messages): + for message in messages: + logger.info(f"Deleting message of {message}") + queue.delete_messages(Entries=[ + { + 'Id': message.receipt_handle, + 'ReceiptHandle': message.receipt_handle + } + ]) + def safely_respond(self, model: Model) -> Tuple[List[Dict[str, str]], List[Dict[str, Any]]]: """ Rescue against failures when attempting to respond (i.e. fingerprint) from models. @@ -43,8 +82,9 @@ def safely_respond(self, model: Model) -> Tuple[List[Dict[str, str]], List[Dict[ responses = [] if messages: logger.info(f"About to respond to: ({messages})") - responses = model.respond(copy.deepcopy(messages)) - return messages, responses + responses = model.respond([schemas.Message(**json.loads(message.body)) for message in messages]) + self.delete_messages(self.input_queue, messages) + return responses def fingerprint(self, model: Model): """ @@ -52,25 +92,18 @@ def fingerprint(self, model: Model): pass messages to model to respond (i.e. fingerprint) them, then pass responses to output queue. If failures happen at any point, resend failed messages to input queue. """ - messages, responses = self.safely_respond(model) + responses = self.safely_respond(model) if responses: - for message, response in zip(messages, responses): - logger.info(f"Processing message of: ({message}, {response})") - try: - self.return_response({"request": message, "response": response}) - except: - self.reset_messages([message]) - else: - for message in messages: - self.reset_messages(messages) + for response in responses: + logger.info(f"Processing message of: ({response})") + self.return_response(response) - def reset_messages(self, messages: Union[List[Dict[str, str]], Dict[str, str]]): + def receive_messages(self, batch_size: int = 1) -> List[Dict[str, Any]]: """ - If, for some reason, we were unable to process the messages, pass back to input queue - for another worker to give them a shot. + Pull batch_size messages from input queue """ - for message in messages: - self.push_message(self.input_queue, message) + messages = self.pop_message(self.input_queue, batch_size) + return messages def return_response(self, message: Dict[str, Any]): """ @@ -78,8 +111,24 @@ def return_response(self, message: Dict[str, Any]): """ return self.push_message(self.output_queue, message) - def push_message(self, queue: str, message: Dict[str, Any]) -> Dict[str, Any]: + def push_message(self, queue: boto3.resources.base.ServiceResource, message: Dict[str, Any]) -> Dict[str, Any]: """ - Generic pass + Actual SQS logic for pushing a message to a queue """ + queue.send_message(MessageBody=json.dumps(message.dict())) return message + + def pop_message(self, queue: boto3.resources.base.ServiceResource, batch_size: int = 1) -> List[Dict[str, Any]]: + """ + Actual SQS logic for pulling batch_size messages from a queue + """ + messages = [] + logger.info("Grabbing message...") + while batch_size > 0: + this_batch_size = min(batch_size, self.batch_size) + batch_messages = queue.receive_messages(MaxNumberOfMessages=this_batch_size) + for message in batch_messages: + messages.append(message) + batch_size -= this_batch_size + return messages + diff --git a/lib/queue/redis_queue.py b/lib/queue/redis_queue.py deleted file mode 100644 index 83f5d7f..0000000 --- a/lib/queue/redis_queue.py +++ /dev/null @@ -1,47 +0,0 @@ -from typing import Dict, List, Any -import os -import json - -import redis - -from lib.queue.queue import Queue - -class RedisQueue(Queue): - def __init__(self, input_queue_name: str, output_queue_name: str, batch_size: int): - """ - Initialize Redis queue - requires string names for intput / output queues, - and batch_size to determine number of messages to pull off queue at each pull. - """ - self.redis = redis.Redis(host=os.getenv("REDIS_HOST", "redis"), port=os.getenv("REDIS_PORT", 6379), db=os.getenv("REDIS_DB", 0)) - self.input_queue = input_queue_name - self.output_queue = output_queue_name - super().__init__(input_queue_name, output_queue_name, batch_size) - - def receive_messages(self, batch_size: int = 1) -> List[Dict[str, Any]]: - """ - Pull batch_size messages from input queue - """ - messages = [] - for i in range(batch_size): - raw_message = self.pop_message(self.input_queue_name) - if raw_message: - messages.append(raw_message) - else: - break - return messages - - def pop_message(self, queue: str): - """ - Actual redis-specific logic for pulling batch_size messages from a queue - """ - message = self.redis.lpop(queue) - if message: - return json.loads(message) - - def push_message(self, queue: str, message: Dict[str, Any]) -> Dict[str, Any]: - """ - Actual redis-specific logic for pushing a message to a queue - """ - self.redis.rpush(queue, json.dumps(message)) - return message - diff --git a/lib/queue/sqs_queue.py b/lib/queue/sqs_queue.py index 142edf3..df883aa 100644 --- a/lib/queue/sqs_queue.py +++ b/lib/queue/sqs_queue.py @@ -5,43 +5,11 @@ from lib.queue.queue import Queue from lib.helpers import get_environment_setting -MAX_MESSAGE_DEPTH = 10 # SQS allows up to 10 messages per batch class SQSQueue(Queue): def __init__(self, input_queue_name: str, output_queue_name: str, batch_size: int): """ Initialize SQS queue - requires string names for intput / output queues, and batch_size to determine number of messages to pull off queue at each pull. """ - self.sqs = boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION")) - self.input_queue = self.sqs.get_queue_by_name(QueueName=input_queue_name) - self.output_queue = self.sqs.get_queue_by_name(QueueName=output_queue_name) super().__init__(input_queue_name, output_queue_name, batch_size) - def receive_messages(self, batch_size: int = 1) -> List[Dict[str, Any]]: - """ - Pull batch_size messages from input queue - """ - messages = self.pop_message(self.input_queue, batch_size) - return messages - - def pop_message(self, queue: boto3.resources.base.ServiceResource, batch_size: int = 1) -> List[Dict[str, Any]]: - """ - Actual SQS logic for pulling batch_size messages from a queue - """ - messages = [] - while batch_size > 0: - this_batch_size = min(batch_size, MAX_MESSAGE_DEPTH) - batch_messages = queue.receive_messages(MaxNumberOfMessages=this_batch_size) - if 'Messages' in batch_messages: - for message in batch_messages['Messages']: - messages.append(message["Body"]) - batch_size -= this_batch_size - return messages - - def push_message(self, queue: boto3.resources.base.ServiceResource, message: Dict[str, Any]) -> Dict[str, Any]: - """ - Actual SQS logic for pushing a message to a queue - """ - queue.send_message(MessageBody=json.dumps(message)) - return message - diff --git a/local.env b/local.env index 13b0281..ee71f36 100644 --- a/local.env +++ b/local.env @@ -1,7 +1,6 @@ -QUEUE_TYPE=redis_queue.RedisQueue +PRESTO_ENV=local INPUT_QUEUE_NAME=input OUTPUT_QUEUE_NAME=output MODEL_NAME=mean_tokens.Model -REDIS_HOST=redis AWS_ACCESS_KEY_ID=SOMETHING AWS_SECRET_ACCESS_KEY=OTHERTHING diff --git a/requirements.txt b/requirements.txt index 74ebdb4..d18ca8c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ boto3==1.18.64 pyacoustid==1.2.2 -redis==4.4.4 sentence-transformers==2.2.2 tmkpy==0.1.1 torch==1.9.0 diff --git a/run.py b/run.py index 77e5a20..e52d7d2 100644 --- a/run.py +++ b/run.py @@ -3,11 +3,11 @@ import importlib from lib.queue.queue import Queue from lib.model.model import Model - +from lib.logger import logger queue = Queue.create() model = Model.create() -print("Beginning fingerprinter loop...") +logger.info("Beginning fingerprinter loop...") while True: queue.fingerprint(model) diff --git a/test/__pycache__/__init__.cpython-39.pyc b/test/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 3183f60609dbdf4578c97adaf9b65cd07916de4b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 271 zcmYj}y-ou$5QP1QF_1h+8eBnz5a<*MQIIGceXyhJ$SXhqE4si8o&tRsd4S&QuvaK}1k zBv4GpAUjI&eo6-X3xCN&;#K)LP>K1Nc-+J!m(Uik&h*BU(fu%++41e{E{Tx0t0dOG ztj*bUG@oR{{X*s;K75oqJZAFs+ diff --git a/test/lib/__pycache__/__init__.cpython-39.pyc b/test/lib/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 799a203c341126d413ee5e6696851fc817e1ed43..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 142 zcmYe~<>g`kg3_m^DIoeWh(HF6K#l_t7qb9~6oz01O-8?!3`HPe1o6vQKeRZts8~NG zJuxj!-#I@eRX;f+H91?qpeVJtBwxP-Nb2WgCh5n=XXa&=#K-FuRNmsS$<0qG%}KQb J8TJ{7836c6ARYh! diff --git a/test/lib/__pycache__/test_s3.cpython-39.pyc b/test/lib/__pycache__/test_s3.cpython-39.pyc deleted file mode 100644 index 16f6697efb1306197dabe064d2c31885d095c0ea..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1537 zcmb7EO^+Kj7`8nhnIw~iZs`XSpq1c~1BoCKCxj}kC>J)DwjAbwH9K~;lbvLO?ST@N z)Aq`L$R7Jk_z%8v>XBoG>tz=f(AHwtF`q8Z5 zIDRuoC%?y-_CAE-&zS@Wklcw2hIgPCdFwmxa2DMkX;5u5MUI}QD17bYs^~yoL7y0&8M%fFvdfZ zAetZUwb{Ifl@6uow08B(#~o6JsA;G+L%I5Uxdlj07I{@^CF^yeC7jDoZO4nkxJU8D73W{+R{cQ2K*p62oiqxYWu}IF#LBvZ-7e;{|DS3VJQ9^ zcOa7rp7Roq3NJvcC0_~|xR5jz0q#h~u7w&a{cE_fXEz`llOXT!hAIZ!y*;>-`S#Ak z9CTAhu0HB&JtF$~sJ&LccY;#$Gi?r$j5#3jK8afpE$9?R8ry#LsY1(I)!{s$+frn zIQEH@bOojK1BrX58vInGPo?~+$*b;*86lpzM}p$-@uy9mxMA?yWJIp^o+Uhql2N-x z{#SXqmU5i>q*<4aSmH-F+Jvrp$L!n0?4mY$qMsFd}Yg?&T+FiXu%cq&q!h*TVYn()5?QWkuo diff --git a/test/lib/model/__pycache__/__init__.cpython-39.pyc b/test/lib/model/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 2aebbd454878cea0a6a6ef660ff997c4a349c1c4..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 148 zcmYe~<>g`kf}E$ODIoeWh(HF6K#l_t7qb9~6oz01O-8?!3`HPe1o6vKKeRZts8~NG zJuxj!-#I@eRX;f+H91?qpeVJtBwxP-Nb2WgCh6w_rE>J+<1_OzOXB183My}L*Z{>! Nb5iXhc4qQhy;o1#K$V=~QL+KuPpiNes+=*uB^8?S6ZRy4{e#v-{^`vDG2uFH}|^ z2b3q!${autK~uu;Yo#=^n3dVgrcJ-Y95e1R559KlX8{YAV=dN#J|}HuAqy#aPJ}DG z6CylG#_hSyIKJkOS=cS@NK&xvZm*v0pQm!(wYKpaKG;b0G@4C0ZDj|US4 zHj9A<9Hhysfmv--Zno=B-qda>^~*^c&7&WL&epFy?VGXL)>NgiyszZzDQwjTwV25K zgZ`hI@!4H1v&k??W&beBq<8^Zp(6EWAG&5cQCv(*orwPMI)NN#&%V4Gibtx*B9q&` zkTIMHW?ZE^_R!Jg6TsJnW&-#J2!d9W14Ao|lgh5_F&TqiIhDhSwzMTIVV_droYKmL zdBDqrzV#|Hwiiz2ozb^`<&53R6YdTXz6cU3TBjBzmAh~&=Zu16FI?noWkHWUhBYA5 zg)4}W$if$4<%{+XIfHd3k)6XHefRAnLjJ}`ze;il2N#i!&S~9IQcqRRe~Hqm1l>_G z615FczPLBidQyHj7|7qEYyts|i)_%OE}MLP5n|rIdU_;tebGKBW=WbxgKzqe&MEUp zunCYtA*bqa6qTdKKrc}1*wx}3mPy6A!n`Z2k@}OBC^rl*m)&HkN>zk>k{Y@AbIA~j63#x}PM6D{p%=RGD`D@=43-W3!58<_|kT{6*MGSLSn z221|6jtKBCT;X5)aCUz`f?$hXUF-s3@V=D0E2&Uj@K<4M)GdS!gijG}BWxmUA#4Nm zLWMQQ+GPTzDThNDYeOsb8BVz(l-fZRW3BEYe2#Dr0WYqPNZAm4o+|keW|wH41mO({ z`-pTcH#+S3b-+1tjq^I>c%e>HlslXwhno?FRjO=+2MD_e*Df0gsiyEILc#DUpn;8y zYTk{_OUa{V9vaS~B;Xs)HYzxcrE}rxK|U&|6R&SRGg?8;=9} E50Gak)c^nh diff --git a/test/lib/model/__pycache__/test_fptg.cpython-39.pyc b/test/lib/model/__pycache__/test_fptg.cpython-39.pyc deleted file mode 100644 index 651f9be3709d569c2ed279278232a5f443d36ec0..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1563 zcmZ`(&2Aev5aw`yCCfqJx=Ee>KuHP|0s`ySLs2v-j5f`|IoN`wnt%d5=B?uRZA%dg=_nmRGduHrT#D`OKEt;B>uY)JGxj$vZZ@>B zXV}|c5hRmPvZNa!0bKKvS6<>lXY-Q)ZC)8xQ4#@r&ZIAcb0z}?$MKn$^i*GlCtUxm zV7+z0&d|y&G=QT zbUT*0m`=wB=rhiaRCY3+Y0Rk^8^m#0ydFDmw@jxqlkd+KHfWW3H6zJdQieevmJe<# zW#1kApdiXI#O?|@#|cqX0HRkw&QGzlV=U_!s6gYb{T$`{6A%MLegQK4gU$e+2xwe~ zE4YADzGCah#4COVGP>Y^G1k$F%Xp7<5pLLe&W7>G=IHBBtZ&rsX8F-isw|r)!=vW6 zq0nl$Xy#9spS<|08p@`q^I@))Fi(c&NYp3o!;!Zk0yY5wRfUF#=0x$Phwu*UKr1uX zb^2D6bG0E|G^x5F81*!{X$sIu#_O34gw|qVLse&}i|xHtnQ3&fRCY_WsG9lV&0Lh$ zD^)%6G<`4$DEl^vx}&YrxC6?<-XS^D>o|AN))Fwdi{d?k0gk({e0-~jANu}BP0HP= zDRI)Nc?yPD_fRpAey3N83ZYuwwOYOjoLa%9GNBAnuV{~5uKjK$L#GzTx>hTm=q>!# za_9THK}}|&&P6HeVbwfEK0d}Ry^FA+yQX&t$Yx`_*|Vlb0v+{{BfO)2kM^T(U+M?s z@~Ly__Yw{gi&*c}ep@nv&|>j;5bae@5_#;C(7?Mw)B7nKWtXhQ$#3anIQ(ekF=JO-BRE zo7Tqjx-gWy-M*Ub)5GETNrHM_%@&&8p5$|U5O53Mm;JnZ7E|;7UF_bUm})!_$TB6M hA0t?wzIR)aHvYCN@H>*kT}jLo20Z2izH={ze*q>Qb>{#8 diff --git a/test/lib/model/__pycache__/test_generic.cpython-39.pyc b/test/lib/model/__pycache__/test_generic.cpython-39.pyc deleted file mode 100644 index 3de5a3cd5c52c8a1b559ad19d6ad2f32b84a4326..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1594 zcmah}&2Aev5aw`yCCfqJq)FYNg;JzwRRq>8r{xVP z_6(c(2|+Rm1sgU!3<2l7;AJojps|I+2yH%}BrZ30h@UXv+ zKK;>=8lChf8dIzL7O`LCKli=2pJwBRk-f>xMMl|I6B5smMhx08zyE=v_Wb_QW@(gd zjLnyJfdk?!2gD$UF+am1kFmgGpi*tH49BSFTR@Bu`6bBs4ebHk5mCR47jOw@e8HB9 zO&9zeWOB&?V=R*em+3BRB5c@N-iGnW=80?%Txiv=c7E%7RTTBZ?os_qS7_Cp)zhc* zk6(OQc4eJcpJmXB~r-$q!`mD4)}WOFHA^+{bJk6wYq^WG@1OY2FqF7+qma?iUow7!2KT>~2@PObtBR`@Wpy{5{z|eh5Aq~@(w^963dI>6dwv)!gptH{YGT=`q^yWrIbqD7f3cGpuZr$kk0$JBwhMV WQ{WR)#$Qm(6goWR9lmoXg}(p_MTIy3 diff --git a/test/lib/model/__pycache__/test_indian_sbert.cpython-39.pyc b/test/lib/model/__pycache__/test_indian_sbert.cpython-39.pyc deleted file mode 100644 index 6576fe72595c04756edf0eadcd1221846ed6f729..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1562 zcmZux&2Aev5aw`yCCh1$=BI9(U=yS$1O(Qtr=kgJ6sZr^p-Eammj$z=^JjqDYmz}4BMl>zmv0&v43cBy`hag z!)|^>kW50wM(qeAz%{S^L#d5Ixl;IdihGAgX4|Rb1E}9ZEO-`;mMgF-rU>2cK)%YFLDQ^AyUY3w5suX<|3iBSh!f#1xn*OZ&hI%T`rZ|5(b5{KfRfY$_16G z`+=qpx`@~tm$XNXm$?OE;ck$k`DL6NXfp&1ZlZ3FpoimaSU&hzzYpF1CnVAKLQ0&p zLY{!7f(IxT$gq_vM{!W7;8Lj2Mqa4sLX=p>C{?nz6r~(KUw7qmtjA4 zX`a_&s`A`*bNYej70r8jPO`T?&G6?TlmK8PTE&7w^z?%`v#?y-k!h$ j69N4V`GxeoPmy%#_ichdkQ)9@B2(z`l=t|pyD9t!($RKF diff --git a/test/lib/model/__pycache__/test_meantokens.cpython-39.pyc b/test/lib/model/__pycache__/test_meantokens.cpython-39.pyc deleted file mode 100644 index c7675ea945a8520bd3a8f2537769187f7498c1b2..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1633 zcma)+&2Aev5P(VUuVgt0k~FDXG}sgg0s(=PQ%*r+#AwqTtV80og)R$bNy%&Nk4#c& zCCDfF+INt9%X{=GcVXhDWeONSK3tYD=#^k}2=hXLrkA}pg}M9B*xd=Z=y5lDIzue@PPwnaE$ z>XoGHtuq2v^fl>58}LmAU20qVe4J$ zyYTD#ASA)KhD=bwPRY;{p75cCI7A3-AR^c?w)YOCHoq0+tLIXg=UmG{k-s+j?Y`vI zK|PU`&M0{M^kE;j*@Iu-24NuK65!$AUhhNOD=d3=<5yKAQcR8Ly2TPb!kX}y5>;$qsf=rJn=4`yRKL)&i zQIG*hc1A_`C$6EeBEWVXF6kLPVN0@(OuS?(Dxxz+!N)pUG7;~P#={j|%jv)$>Rd(J z1M3_4hgsbGSr$e8C^@WuPq>oFyq-NrSoZXPA(kXI9Zuj_5F8k+zG zW(A4}`&ji&3&eYBJ4%|Fs?vA7n8^*IQV4p3qT5nvrqDq{kyTS0aHaU%hO){4m2JI~ znWQ$fKG$`3Wv|CZ9o}zJg`y7KjZVLUjWL z>TC?RU|LsDS;r)DBpN0=I3G3hLVbfK-#U{9#LG(n$1|zsD!~;<|4Q*Yu4d;x?!cVx zfFLx6l4Ieo(?#!p7XIAHpc~paO)Fl?G_~y%Z~d&mekV=wY&0_p@2k3t;y#MYxLglU zhQjlU9gXKOrm-KloDTh$Jg?HU+p(s~%;ZxSmb!M0M(L)87J8?FFR0f5Zcm6l3zEw9ZdC?3$q9j68fL zaKKG+VbO{w$fcD;q2kU@7wWd0G=CbG_Q=QN;gFz)M|i+{*x)^G8j?9~@fx;*)BsV* zNmRA_UB~3cd0(coVve|1gy5?B`>%E3W504?RW5BRq6*H>VzOZ2&rtE^{vT(T^fqMn zx|q$fF{QHUjho(OE2^u@tR~F-{3>gzn$R@I(dtheaMpY@=o4MweFOIJ@4@Gcqwb># zjjxSa77J%g+%?R#_Iw@~!+P6HH6WTsUHCH&UDp~AS!0)bmQ$*+%oEuR^%!FtibmgH4fyLMB_W19hkcS diff --git a/test/lib/model/__pycache__/test_video.cpython-39.pyc b/test/lib/model/__pycache__/test_video.cpython-39.pyc deleted file mode 100644 index 451dabeff3c3ebec62dd51c72d8913b51b7baf68..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3244 zcmb7G%Wm676rJHq)XTQ~O4_7F(k3mmh}gu*q7M{F)3hiGy8&WnF&l!`j3v^f$P6hb z7G$7>y==drC?GHMOWbtPRevFiqW2CZS+;BxrJy^{JCAeEoEfiPuMqft`S}t5vqs2Y zSeX3`C_ILLdH{wKPD8>l)j}F+OpA1;)1uyBhH5vNsZxtsDlM^+N^Mq#R1Ygrl~t9G z5!RwQtB?H`*uwXOo7_4f+!7>MJl5F~r!Ppe^f$~+wwhXA-t;bpYkVEj3cm)Mx}Gm=i!^)b za}lc@v?(lc@?lV=^Qztp5dhO92a3Idjdc2RMuyjgc@?5$?Su5<~m$ zOJ^q)GIe;z+uL)VLKmmq743Z|kuZPkWMG}p-*uF?D!OX8R&tn|smOMcI<5kjL+D`j z{Hj~3!{A0HqGZnxMQh881mA{IEIoPn9aOcICwjw&`RT^ip)e!?8PQWh$&ijTOmT~{ z1Wip{+K{kXM`Z3~f_&LaVLzGM_QFtbH}17XQCq@N9H!1gXq z`W(_WZg%ovjB)XbDJPKObZ7^_luR4xTsx&h!zGz}FZ=h2w}fUX%cJ+HvF*%!`tTgEgn0N+*mV&d#pibN2S4 zM08lApsfoi6*Nto^}HrU)|b8U;07`vHxOiI?CkvTtwhGLH3<6iz7$Umg!DS%JK=$k zP!!7>dw#DYWFq}uRtzhV^!c@UvbnGQtjij&rJsrIxQGu%DzD>`R3I@}`4Scr8F!=? zDNZ&k1xss+rQ?9IRaKh0#wy-yUDeK!gH?)pqz_wE`2tit{%lEA%+QY&Q;i8K7Z8PF ze+*-A_p&oIH`X~_&|ydu{))yg zA;Uor0uoR~RHQPH^hg_$kud}tXo2}V+(Lh7iiC7zP3v^1Lpm$(zyg8;#ebU+)7oP7 z@zz3tJQ*w(l81rAatlK7jyl_L+msSdie8rHws_-%XvF)?l3anc) z%vfsWZLGl6%}DFobQ80*e*~Qhd8Ht)cdpvOtjoDg;0C75_y!7ief#u?;*|)DKsHcU z*^xd(mgqwbSW<`@8JR;Jgii}Bkh#fbjV;;0wJ(FoZP;}Z_fi4TMZ}7*;@y;&iYBh& zsP?bo-!xQ2nW@w&FOJk4zc0n;Lxs_JSS2NH3ep4Wug3ZuW>xn+Q6~#e6@Yf{GSIFZ zV-7vMOVzpBhD?DTSly=`?SG*NCoix%G%F3^;j>C+@H7iN2OnIe==WPF=~p&Cz-D;| zme;f|#6aGK+6(ySI-J!>5G(~;DnG$~m(lX}Jme7^K_9N1@eT3>T*s&{DNR$TfA*iK zUl?YqS*G#{h+6ipM&+3`|2 ze7;Bv4Em?Hz<})P7OfZ-Ec)!X-hjH2ZP>Qz-E8D`zvpL&LSCQbT6k9!P^*>TBPh3u z7mP%eNA*)rj(p+#dndPwGF!y=h@R^my8(D=I&&nq)QnBWO%BaBZyc(dW!FWXdZ Ze^ns$H9mXORWSDWQo)M_4?&IE+P`XlQ)B=D diff --git a/test/lib/queue/__pycache__/__init__.cpython-39.pyc b/test/lib/queue/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index d8862417e3b81858516192b06fed9c116fd95874..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 148 zcmYe~<>g`kg21PyDIoeWh(HF6K#l_t7qb9~6oz01O-8?!3`HPe1o6vKKeRZts8~NG zJuxj!-#I@eRX;f+H91?qpeVJtBwxP-Nb2WgCg~TJrk1Aa$H!;pWtPOp>lIYq;;_lh PPbtkwwF4RX8HgDGzb_(G diff --git a/test/lib/queue/__pycache__/test_queue.cpython-39.pyc b/test/lib/queue/__pycache__/test_queue.cpython-39.pyc deleted file mode 100644 index 7d9c07c7f9f88e585ed37c0e076d0b2dac7e75ae..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1784 zcmZ`)OK%%D5GMDf)yr1xx-NQXdgw(!D>au~QWOD<(Wz-v$W3q5s)lwwIpcfsMzkX+7u^O2kM`yB$$Uw=OmCm|vKpt1b~ zU_61QXCMR-G$+jUlu@v~+$((M7Xb?>`u#jCA{JRa$Xi9sVylOFyXdeEB|i`miPjq; zT9VGXcRuUMzKE|qbt&ocU`1dyy&%K(JJ^p*hMsAkC(|rFuhMH1&XYR58d4KJZ)77u zi7%dCZ1m0xDKc%X6c=wl9vDxc>90UYf}t}{P~p8H%on~0pbtbSBIrZW5;63VXhR?! z)BZ{7+I^P7>dk{XqH_jKAAzVLa^VU8&c7ubHh{&!#DrNnZ)y`&O%44!3}omjTxh}@ z%;GIIftLB@TcQrY`|soNOD&Zii)nIsIXqFmk6Ak&)Pj8weS6$eHolIrT-CP$rFOx#59*kKCoHzv2e1_lU zADrKZag8_<`Y~O6d7tM0FWJc*@rj|?+!-h}SyER)5v+pWDPZB<(%NPVdQgWe`sWwa z%>$|+ZC1egp8<;i-aDU?a`2GoAHM+>fw0rLZC0P~8jEq~cUTCS}CEP~d1Di(VB5H5wDl69j>4k$L# zVmgc!mfO*9pkykuSCSV}>tqU0wcupUt5UF@mZjME*+_)C&jZ)O{m3>wEwy7r;w*uS1Y=YW kK$rjn+DgG@WN(f5TL;8Qo1B0u>GXMYp diff --git a/test/lib/queue/__pycache__/test_sqs_queue.cpython-39.pyc b/test/lib/queue/__pycache__/test_sqs_queue.cpython-39.pyc deleted file mode 100644 index f240650d23d28d50e99fb7c45bd9ae9c31d4dc9e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1619 zcmZuxOK;mo5Z+xrw2~;taT@di6bOR$z>C!;K#)Vw7BPyREFf^x11}4jyH+HVqP$Bc zu%VvpYkxp*(y@OfKVh#q<(O+uo!OP;S4eQUI~>l;_sz`agF%nL_38aH`EN+b2i#o0 zKu3Oq$7CQRk+dY4f76V@o|UW$vY-mHka~TTMW7GLxawpbO8y`+l+gu|k)l(69%NnB zlkqXre<`}`uL%4_4@lbi2>!_^WwyJYk0-@`T^!qJo;Sstlv;l9>flwYT2-Lc^=p&g zbI>Y+Ib}=*%TPsElRuSdE~8kkm8n}@ zsDfP!hHH$s2akCSq9NyGK?GfpDP2&(nm|&FcpiW@lmtA7BO6U-^R}^Z-8S(4h+fi2 zX~?(m#xwm#YQKH7#InSDv#{I*DYB9 z1;x+n3$u^laa8;S3j{u9Km<@RWz#_5I$Q=T@;A`4j26^etQfWl5EG1SSefz9PCiLv zjVajsW7YWF#Nnx!<(1NX)NgB5sL5L;DrHPQRz@eN?&k*3H@}~>d6_ajKpo$hAVH2L zy^S9pA{XMeBmCr%(;eEQXOC|n|Kcxqmd@>UvO)RU8 Date: Fri, 18 Aug 2023 13:25:53 -0700 Subject: [PATCH 2/3] remove wip script --- lib/http.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/http.py b/lib/http.py index 8c11deb..d73c0e7 100644 --- a/lib/http.py +++ b/lib/http.py @@ -1,8 +1,3 @@ -# import datetime -# from lib.queue.queue import Queue -# queue = Queue.create("input", "output") -# queue.push_message(queue.input_queue, {"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"}, "input_queue": queue.input_queue_name, "output_queue": queue.output_queue_name, "start_time": str(datetime.datetime.now())}) -# import json import datetime from typing import Any, Dict From c178162d455392b9bb578796cea3379d8a52add0 Mon Sep 17 00:00:00 2001 From: Devin Gaffney Date: Mon, 21 Aug 2023 05:32:08 -0700 Subject: [PATCH 3/3] CV2-3551 resolve some bad file read patterns, move env to DEPLOY_ENV var, use debug statements for logging --- README.md | 4 ---- lib/queue/queue.py | 12 ++++++------ lib/queue/sqs_queue.py | 15 --------------- local.env | 2 +- test/lib/model/test_audio.py | 19 ++++++++++++++++--- test/lib/model/test_image.py | 12 ++++++++---- test/lib/model/test_video.py | 4 +++- test/lib/queue/test_queue.py | 1 - 8 files changed, 34 insertions(+), 35 deletions(-) delete mode 100644 lib/queue/sqs_queue.py diff --git a/README.md b/README.md index 49a4db7..fe9387a 100644 --- a/README.md +++ b/README.md @@ -39,10 +39,6 @@ docker run -e -e INPUT_QUEUE_NAME= -e OUTPUT_QUEUE_NAME=`, ``, and `` with the appropriate values. -Currently supported `queue_name` values are just module names keyed from the `queue` directory, and currently are as follows: - -* `sqs_queue.SQSQueue` - SQS-Based - Currently supported `model_name` values are just module names keyed from the `model` directory, and currently are as follows: * `fptg.Model` - text model, uses `meedan/paraphrase-filipino-mpnet-base-v2` diff --git a/lib/queue/queue.py b/lib/queue/queue.py index 568d642..8662d3d 100644 --- a/lib/queue/queue.py +++ b/lib/queue/queue.py @@ -13,8 +13,8 @@ class Queue: @classmethod def create(cls, input_queue_name: str = None, output_queue_name: str = None, batch_size: int = 10): """ - Instantiate a queue. Must pass queue_driver_name (i.e. sqs_queue.SQSQueue vs redis_queue.RedisQueue), - input_queue_name, output_queue_name, and batch_size. Pulls settings and then inits instance. + Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size. + Pulls settings and then inits instance. """ input_queue_name = get_setting(input_queue_name, "INPUT_QUEUE_NAME") output_queue_name = get_setting(output_queue_name, "OUTPUT_QUEUE_NAME") @@ -32,8 +32,8 @@ def get_or_create_queue(self, queue_name): raise def get_sqs(self): - presto_env = get_environment_setting("PRESTO_ENV") - if presto_env == "local": + deploy_env = get_environment_setting("DEPLOY_ENV") + if deploy_env == "local": logger.info(f"Using ElasticMQ Interface") return boto3.resource('sqs', region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'), @@ -65,7 +65,7 @@ def get_output_queue_name(self, input_queue_name: str, output_queue_name: str = def delete_messages(self, queue, messages): for message in messages: - logger.info(f"Deleting message of {message}") + logger.debug(f"Deleting message of {message}") queue.delete_messages(Entries=[ { 'Id': message.receipt_handle, @@ -81,7 +81,7 @@ def safely_respond(self, model: Model) -> Tuple[List[Dict[str, str]], List[Dict[ messages = self.receive_messages(model.BATCH_SIZE) responses = [] if messages: - logger.info(f"About to respond to: ({messages})") + logger.debug(f"About to respond to: ({messages})") responses = model.respond([schemas.Message(**json.loads(message.body)) for message in messages]) self.delete_messages(self.input_queue, messages) return responses diff --git a/lib/queue/sqs_queue.py b/lib/queue/sqs_queue.py deleted file mode 100644 index df883aa..0000000 --- a/lib/queue/sqs_queue.py +++ /dev/null @@ -1,15 +0,0 @@ -import json -from typing import Any, Dict, List -import boto3 - -from lib.queue.queue import Queue -from lib.helpers import get_environment_setting - -class SQSQueue(Queue): - def __init__(self, input_queue_name: str, output_queue_name: str, batch_size: int): - """ - Initialize SQS queue - requires string names for intput / output queues, - and batch_size to determine number of messages to pull off queue at each pull. - """ - super().__init__(input_queue_name, output_queue_name, batch_size) - diff --git a/local.env b/local.env index ee71f36..1738ac1 100644 --- a/local.env +++ b/local.env @@ -1,4 +1,4 @@ -PRESTO_ENV=local +DEPLOY_ENV=local INPUT_QUEUE_NAME=input OUTPUT_QUEUE_NAME=output MODEL_NAME=mean_tokens.Model diff --git a/test/lib/model/test_audio.py b/test/lib/model/test_audio.py index 80d660c..26f4443 100644 --- a/test/lib/model/test_audio.py +++ b/test/lib/model/test_audio.py @@ -6,6 +6,7 @@ from acoustid import FingerprintGenerationError from lib import schemas + class TestAudio(unittest.TestCase): def setUp(self): self.audio_model = Model() @@ -14,7 +15,13 @@ def setUp(self): @patch('urllib.request.Request') def test_fingerprint_audio_success(self, mock_request, mock_urlopen): mock_request.return_value = mock_request - mock_urlopen.return_value = MagicMock(read=MagicMock(return_value=open("data/test-audio.mp3", 'rb').read())) + + # Use the `with` statement for proper file handling + with open("data/test-audio.mp3", 'rb') as f: + contents = f.read() + + mock_urlopen.return_value = MagicMock(read=MagicMock(return_value=contents)) + audio = schemas.Message(body=schemas.AudioInput(id="123", callback_url="http://example.com/callback", url="https://example.com/audio.mp3")) result = self.audio_model.fingerprint(audio) mock_request.assert_called_once_with(audio.body.url, headers={'User-Agent': 'Mozilla/5.0'}) @@ -29,7 +36,13 @@ def test_fingerprint_audio_failure(self, mock_decode_fingerprint, mock_fingerpri mock_request, mock_urlopen): mock_fingerprint_file.side_effect = FingerprintGenerationError("Failed to generate fingerprint") mock_request.return_value = mock_request - mock_urlopen.return_value = MagicMock(read=MagicMock(return_value=open("data/test-audio.mp3", 'rb').read())) + + # Use the `with` statement for proper file handling + with open("data/test-audio.mp3", 'rb') as f: + contents = f.read() + + mock_urlopen.return_value = MagicMock(read=MagicMock(return_value=contents)) + audio = schemas.Message(body=schemas.AudioInput(id="123", callback_url="http://example.com/callback", url="https://example.com/audio.mp3")) result = self.audio_model.fingerprint(audio) mock_request.assert_called_once_with(audio.body.url, headers={'User-Agent': 'Mozilla/5.0'}) @@ -37,4 +50,4 @@ def test_fingerprint_audio_failure(self, mock_decode_fingerprint, mock_fingerpri self.assertEqual([], result["hash_value"]) if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/test/lib/model/test_image.py b/test/lib/model/test_image.py index d9331aa..fee690a 100644 --- a/test/lib/model/test_image.py +++ b/test/lib/model/test_image.py @@ -11,20 +11,24 @@ class TestModel(unittest.TestCase): @patch("pdqhashing.hasher.pdq_hasher.PDQHasher") def test_compute_pdq(self, mock_pdq_hasher): + with open("img/presto_flowchart.png", "rb") as file: + image_content = file.read() mock_hasher_instance = mock_pdq_hasher.return_value mock_hasher_instance.fromBufferedImage.return_value.getHash.return_value.dumpBitsFlat.return_value = '1001' - result = Model.compute_pdq(io.BytesIO(open("img/presto_flowchart.png", "rb").read())) + result = Model.compute_pdq(io.BytesIO(image_content)) self.assertEqual(result, '0011100000111011010110100001001110001011110100100010101011010111010110101010000111001010111000001010111111110000000101110010000011111110111110100100011111010010110110101111101100111001000000010010100101010111110001001101101011000110001000001110010000111100') @patch("urllib.request.urlopen") def test_get_iobytes_for_image(self, mock_urlopen): + with open("img/presto_flowchart.png", "rb") as file: + image_content = file.read() mock_response = Mock() - mock_response.read.return_value = open("img/presto_flowchart.png", "rb").read() + mock_response.read.return_value = image_content mock_urlopen.return_value = mock_response image = schemas.Message(body=schemas.ImageInput(id="123", callback_url="http://example.com?callback", url="http://example.com/image.jpg")) result = Model().get_iobytes_for_image(image) self.assertIsInstance(result, io.BytesIO) - self.assertEqual(result.read(), open("img/presto_flowchart.png", "rb").read()) + self.assertEqual(result.read(), image_content) @patch("urllib.request.urlopen") def test_get_iobytes_for_image_raises_error(self, mock_urlopen): @@ -44,4 +48,4 @@ def test_fingerprint(self, mock_compute_pdq, mock_get_iobytes_for_image): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() diff --git a/test/lib/model/test_video.py b/test/lib/model/test_video.py index 371eaa2..62a67bb 100644 --- a/test/lib/model/test_video.py +++ b/test/lib/model/test_video.py @@ -26,10 +26,12 @@ def test_get_tempfile(self, mock_named_tempfile): @patch('pathlib.Path') def test_fingerprint_video(self, mock_pathlib, mock_upload_file_to_s3, mock_hash_video, mock_urlopen): + with open("data/test-video.mp4", "rb") as video_file: + video_contents = video_file.read() mock_hash_video_output = MagicMock() mock_hash_video_output.getPureAverageFeature.return_value = "hash_value" mock_hash_video.return_value = mock_hash_video_output - mock_urlopen.return_value = MagicMock(read=MagicMock(return_value=open("data/test-video.mp4", "rb").read())) + mock_urlopen.return_value = MagicMock(read=MagicMock(return_value=video_contents)) self.video_model.fingerprint(schemas.Message(body=schemas.VideoInput(id="123", callback_url="http://blah.com?callback_id=123", url="http://example.com/video.mp4"))) mock_urlopen.assert_called_once() mock_hash_video.assert_called_once_with(ANY, "/usr/local/bin/ffmpeg") diff --git a/test/lib/queue/test_queue.py b/test/lib/queue/test_queue.py index 62ab38e..cf303d7 100644 --- a/test/lib/queue/test_queue.py +++ b/test/lib/queue/test_queue.py @@ -7,7 +7,6 @@ from lib.model.generic_transformer import GenericTransformerModel from lib.queue.queue import Queue -from lib.queue.sqs_queue import SQSQueue from lib import schemas class FakeSQSMessage(BaseModel): body: str