Skip to content

Commit

Permalink
Merge pull request #28 from meedan/cv2-3551-add-elasticmq
Browse files Browse the repository at this point in the history
CV2-3551 add elasticmq, remove redis, redo tests, update message passing now that sqs style is working throughout
  • Loading branch information
DGaffney authored Aug 21, 2023
2 parents 24d8b5e + c178162 commit ea875e0
Show file tree
Hide file tree
Showing 38 changed files with 192 additions and 256 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y ffmpeg cmake swig libavcodec-dev libavformat-dev git
RUN ln -s /usr/bin/ffmpeg /usr/local/bin/ffmpeg

COPY . .
COPY ./threatexchange /app/threatexchange
RUN make -C /app/threatexchange/tmk/cpp
RUN git clone https://github.com/meedan/chromaprint.git
RUN cd chromaprint && cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TOOLS=ON .
Expand All @@ -25,4 +25,5 @@ RUN pip install transformers
RUN pip install pact-python
RUN pip install --no-cache-dir -r requirements.txt
RUN cd threatexchange/pdq/python && pip install .
COPY . .
CMD ["make", "run"]
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

Presto is a Python service that aims to perform, most generally, on-demand media fingerprints at scale. In the context of text, fingerprints are transformer vectors - video is done by TMK, images by PDQ, and audio by chromaprint.

Presto performs text vectorization using different Hugging Face models. The texts are enqueued via a generic Queue class, which can either be a Redis (local) or SQS (production) instance. The project's directory structure consists of two main directories, `model/` and `queue/`, which contain classes for different models and queues, respectively. The `test/` directory contains test classes for the different models and queues. Audio, Image, and Video fingerprinting are accomplished by specific packages aimed at those tasks. Text generates lists of floats as output (i.e. vectors), while Audio, Image, And Video generate string represented bitfields or hashes. Video additionally generates .tmk files which are ≈250kb files typically (though the file can technically grow as the video length grows).
Presto performs text vectorization using different Hugging Face models. The texts are enqueued via a generic Queue class, which can either be a ElasticMQ (local) or SQS (production) instance. The project's directory structure consists of two main directories, `model/` and `queue/`, which contain classes for different models and queues, respectively. The `test/` directory contains test classes for the different models and queues. Audio, Image, and Video fingerprinting are accomplished by specific packages aimed at those tasks. Text generates lists of floats as output (i.e. vectors), while Audio, Image, And Video generate string represented bitfields or hashes. Video additionally generates .tmk files which are ≈250kb files typically (though the file can technically grow as the video length grows).

### Dependencies

This project requires the following dependencies, which are listed in the `requirements.txt` file:
- boto3==1.18.64
- pyacoustid==1.2.2
- redis==4.4.4
- sentence-transformers==2.2.0
- tmkpy==0.1.1
- torch==1.9.0
Expand All @@ -35,15 +34,10 @@ To run the project, you can use the provided `Dockerfile`, or start via `docker-

```
docker build -t text-vectorization .
docker run -e QUEUE_TYPE=<queue_type> -e INPUT_QUEUE_NAME=<input_queue_name> -e OUTPUT_QUEUE_NAME=<output_queue_name> -e MODEL_NAME=<model_name>
docker run -e -e INPUT_QUEUE_NAME=<input_queue_name> -e OUTPUT_QUEUE_NAME=<output_queue_name> -e MODEL_NAME=<model_name>
```

Here, we require at least three environment variables - `queue_type`, `input_queue_name`, and `model_name`. If left unspecified, `output_queue_name` will be automatically set to `input_queue_name[-output]`. Depending on your usage, you may need to replace `<queue_type>`, `<input_queue_name>`, `<output_queue_name>`, and `<model_name>` with the appropriate values.

Currently supported `queue_name` values are just module names keyed from the `queue` directory, and currently are as follows:

* `sqs_queue.SQSQueue` - SQS-Based
* `redis_queue.RedisQueue` - Redis-Based
Here, we require at least two environment variables - `input_queue_name`, and `model_name`. If left unspecified, `output_queue_name` will be automatically set to `input_queue_name[-output]`. Depending on your usage, you may need to replace `<input_queue_name>`, `<output_queue_name>`, and `<model_name>` with the appropriate values.

Currently supported `model_name` values are just module names keyed from the `model` directory, and currently are as follows:

Expand All @@ -65,7 +59,7 @@ The `main.py` file is the HTTP server. We use FastAPI and provide two endpoints,

### Queues

Presto is able to `process_messages` via redis or SQS. In practice, we use redis for local development, and SQS for production environments. When interacting with a `queue`, we use the generic superclass `queue`. `queue.fingerprint` takes as an argument a `model` instance. The `fingerprint` routine collects a batch of `BATCH_SIZE` messages appropriate to the `BATCH_SIZE` for the `model` specified. Once pulled from the `input_queue`, those messages are processed via `model.respond`. The resulting fingerprint outputs from the model are then zipped with the original message pulled from the `input_queue`, and a message is placed onto the `output_queue` that consists of exactly: `{"request": message, "response": response}`.
Presto is able to `process_messages` via ElasticMQ or SQS. In practice, we use ElasticMQ for local development, and SQS for production environments. When interacting with a `queue`, we use the generic superclass `queue`. `queue.fingerprint` takes as an argument a `model` instance. The `fingerprint` routine collects a batch of `BATCH_SIZE` messages appropriate to the `BATCH_SIZE` for the `model` specified. Once pulled from the `input_queue`, those messages are processed via `model.respond`. The resulting fingerprint outputs from the model are then zipped with the original message pulled from the `input_queue`, and a message is placed onto the `output_queue` that consists of exactly: `{"request": message, "response": response}`.

### Models

Expand Down
14 changes: 10 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
version: '3.9'

services:
app:
platform: linux/amd64
build: .
env_file:
- ./local.env
depends_on:
- redis
- elasticmq
links:
- redis
redis:
image: redis
- elasticmq
volumes:
- ./:/app
elasticmq:
image: softwaremill/elasticmq
hostname: presto-elasticmq
ports:
- "9324:9324"
Binary file removed lib/__pycache__/__init__.cpython-39.pyc
Binary file not shown.
Binary file removed lib/__pycache__/s3.cpython-39.pyc
Binary file not shown.
4 changes: 2 additions & 2 deletions lib/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import BaseModel
from lib.queue.queue import Queue
from lib.logger import logger

from lib import schemas
app = FastAPI()

@app.middleware("http")
Expand All @@ -29,7 +29,7 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]:
@app.post("/fingerprint_item/{fingerprinter}")
def fingerprint_item(fingerprinter: str, message: Dict[str, Any]):
queue = Queue.create(fingerprinter, f"{fingerprinter}-output")
queue.push_message(queue.input_queue_name, {"body": message, "input_queue": queue.input_queue_name, "output_queue": queue.output_queue_name, "start_time": str(datetime.datetime.now())})
queue.push_message(queue.input_queue_name, schemas.Message(body=message, input_queue=queue.input_queue_name, output_queue=queue.output_queue_name, start_time=str(datetime.datetime.now())))
return {"message": "Message pushed successfully"}

@app.post("/trigger_callback")
Expand Down
Binary file removed lib/model/__pycache__/__init__.cpython-39.pyc
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions lib/model/generic_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def respond(self, docs: Union[List[schemas.Message], schemas.Message]) -> List[s
"""
if not isinstance(docs, list):
docs = [docs]
print(docs)
vectorizable_texts = [e.body.text for e in docs]
vectorized = self.vectorize(vectorizable_texts)
for doc, vector in zip(docs, vectorized):
Expand Down
Binary file removed lib/queue/__pycache__/__init__.cpython-39.pyc
Binary file not shown.
Binary file removed lib/queue/__pycache__/sqs_queue.cpython-39.pyc
Binary file not shown.
109 changes: 79 additions & 30 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,58 @@
import json
from typing import Any, List, Dict, Tuple, Union
import copy
import os
from abc import ABC

from lib.helpers import get_class, get_setting
import boto3
import botocore

from lib.helpers import get_class, get_setting, get_environment_setting
from lib.model.model import Model
from lib.logger import logger

class Queue(ABC):
from lib import schemas
class Queue:
@classmethod
def create(cls, input_queue_name: str = None, output_queue_name: str = None, queue_driver_name: str = None, batch_size: int = None):
def create(cls, input_queue_name: str = None, output_queue_name: str = None, batch_size: int = 10):
"""
Instantiate a queue. Must pass queue_driver_name (i.e. sqs_queue.SQSQueue vs redis_queue.RedisQueue),
input_queue_name, output_queue_name, and batch_size. Pulls settings and then inits instance.
Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size.
Pulls settings and then inits instance.
"""
logger.info(f"Starting queue with: ({input_queue_name}, {output_queue_name}, {queue_driver_name}, {batch_size})")
input_queue_name = get_setting(input_queue_name, "INPUT_QUEUE_NAME")
output_queue_name = get_setting(output_queue_name, "OUTPUT_QUEUE_NAME")
return get_class('lib.queue.', get_setting(queue_driver_name, "QUEUE_TYPE"))(input_queue_name, output_queue_name, batch_size)
logger.info(f"Starting queue with: ('{input_queue_name}', '{output_queue_name}', {batch_size})")
return Queue(input_queue_name, output_queue_name, batch_size)

def get_or_create_queue(self, queue_name):
try:
return self.sqs.get_queue_by_name(QueueName=queue_name)
except botocore.exceptions.ClientError as e:
logger.info(f"Queue {queue_name} doesn't exist - creating")
if e.response['Error']['Code'] == "AWS.SimpleQueueService.NonExistentQueue":
return self.sqs.create_queue(QueueName=queue_name)
else:
raise

def get_sqs(self):
deploy_env = get_environment_setting("DEPLOY_ENV")
if deploy_env == "local":
logger.info(f"Using ElasticMQ Interface")
return boto3.resource('sqs',
region_name=(get_environment_setting("AWS_DEFAULT_REGION") or 'eu-central-1'),
endpoint_url=(get_environment_setting("ELASTICMQ_URI") or 'http://presto-elasticmq:9324'),
aws_access_key_id=(get_environment_setting("AWS_ACCESS_KEY_ID") or 'x'),
aws_secret_access_key=(get_environment_setting("AWS_SECRET_ACCESS_KEY") or 'x'))
else:
logger.info(f"Using SQS Interface")
return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION"))

def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1):
"""
Start a specific queue - must pass input_queue_name - optionally pass output_queue_name, batch_size.
"""
self.sqs = self.get_sqs()
self.input_queue = self.get_or_create_queue(input_queue_name)
self.output_queue = self.get_or_create_queue(output_queue_name)
self.input_queue_name = input_queue_name
self.batch_size = batch_size
self.output_queue_name = self.get_output_queue_name(input_queue_name, output_queue_name)

def get_output_queue_name(self, input_queue_name: str, output_queue_name: str = None):
Expand All @@ -34,6 +63,16 @@ def get_output_queue_name(self, input_queue_name: str, output_queue_name: str =
output_queue_name = f'{input_queue_name}-output'
return output_queue_name

def delete_messages(self, queue, messages):
for message in messages:
logger.debug(f"Deleting message of {message}")
queue.delete_messages(Entries=[
{
'Id': message.receipt_handle,
'ReceiptHandle': message.receipt_handle
}
])

def safely_respond(self, model: Model) -> Tuple[List[Dict[str, str]], List[Dict[str, Any]]]:
"""
Rescue against failures when attempting to respond (i.e. fingerprint) from models.
Expand All @@ -42,44 +81,54 @@ def safely_respond(self, model: Model) -> Tuple[List[Dict[str, str]], List[Dict[
messages = self.receive_messages(model.BATCH_SIZE)
responses = []
if messages:
logger.info(f"About to respond to: ({messages})")
responses = model.respond(copy.deepcopy(messages))
return messages, responses
logger.debug(f"About to respond to: ({messages})")
responses = model.respond([schemas.Message(**json.loads(message.body)) for message in messages])
self.delete_messages(self.input_queue, messages)
return responses

def fingerprint(self, model: Model):
"""
Main routine. Given a model, in a loop, read tasks from input_queue_name at batch_size depth,
pass messages to model to respond (i.e. fingerprint) them, then pass responses to output queue.
If failures happen at any point, resend failed messages to input queue.
"""
messages, responses = self.safely_respond(model)
responses = self.safely_respond(model)
if responses:
for message, response in zip(messages, responses):
logger.info(f"Processing message of: ({message}, {response})")
try:
self.return_response({"request": message, "response": response})
except:
self.reset_messages([message])
else:
for message in messages:
self.reset_messages(messages)
for response in responses:
logger.info(f"Processing message of: ({response})")
self.return_response(response)

def reset_messages(self, messages: Union[List[Dict[str, str]], Dict[str, str]]):
def receive_messages(self, batch_size: int = 1) -> List[Dict[str, Any]]:
"""
If, for some reason, we were unable to process the messages, pass back to input queue
for another worker to give them a shot.
Pull batch_size messages from input queue
"""
for message in messages:
self.push_message(self.input_queue, message)
messages = self.pop_message(self.input_queue, batch_size)
return messages

def return_response(self, message: Dict[str, Any]):
"""
Send message to output queue
"""
return self.push_message(self.output_queue, message)

def push_message(self, queue: str, message: Dict[str, Any]) -> Dict[str, Any]:
def push_message(self, queue: boto3.resources.base.ServiceResource, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Generic pass
Actual SQS logic for pushing a message to a queue
"""
queue.send_message(MessageBody=json.dumps(message.dict()))
return message

def pop_message(self, queue: boto3.resources.base.ServiceResource, batch_size: int = 1) -> List[Dict[str, Any]]:
"""
Actual SQS logic for pulling batch_size messages from a queue
"""
messages = []
logger.info("Grabbing message...")
while batch_size > 0:
this_batch_size = min(batch_size, self.batch_size)
batch_messages = queue.receive_messages(MaxNumberOfMessages=this_batch_size)
for message in batch_messages:
messages.append(message)
batch_size -= this_batch_size
return messages

47 changes: 0 additions & 47 deletions lib/queue/redis_queue.py

This file was deleted.

47 changes: 0 additions & 47 deletions lib/queue/sqs_queue.py

This file was deleted.

3 changes: 1 addition & 2 deletions local.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
QUEUE_TYPE=redis_queue.RedisQueue
DEPLOY_ENV=local
INPUT_QUEUE_NAME=input
OUTPUT_QUEUE_NAME=output
MODEL_NAME=mean_tokens.Model
REDIS_HOST=redis
AWS_ACCESS_KEY_ID=SOMETHING
AWS_SECRET_ACCESS_KEY=OTHERTHING
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
boto3==1.18.64
pyacoustid==1.2.2
redis==4.4.4
sentence-transformers==2.2.2
tmkpy==0.1.1
torch==1.9.0
Expand Down
4 changes: 2 additions & 2 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import importlib
from lib.queue.queue import Queue
from lib.model.model import Model

from lib.logger import logger
queue = Queue.create()

model = Model.create()

print("Beginning fingerprinter loop...")
logger.info("Beginning fingerprinter loop...")
while True:
queue.fingerprint(model)
Binary file removed test/__pycache__/__init__.cpython-39.pyc
Binary file not shown.
Binary file removed test/lib/__pycache__/__init__.cpython-39.pyc
Binary file not shown.
Binary file removed test/lib/__pycache__/test_s3.cpython-39.pyc
Binary file not shown.
Binary file removed test/lib/model/__pycache__/__init__.cpython-39.pyc
Binary file not shown.
Binary file removed test/lib/model/__pycache__/test_audio.cpython-39.pyc
Binary file not shown.
Binary file removed test/lib/model/__pycache__/test_fptg.cpython-39.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file removed test/lib/model/__pycache__/test_model.cpython-39.pyc
Binary file not shown.
Binary file removed test/lib/model/__pycache__/test_video.cpython-39.pyc
Binary file not shown.
Loading

0 comments on commit ea875e0

Please sign in to comment.