Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV2-3551 add local queue consumption and re-work a ton of the startup flow to accommodate #33

Merged
merged 6 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
.PHONY: run run_http run_worker run_test

run:
./start_healthcheck_and_model_engine.sh
./start_all.sh

run_http:
uvicorn main:app --host 0.0.0.0 --reload

run_worker:
python run.py
python run_worker.py

run_processor:
python run_processor.py

run_test:
python -m unittest discover .
python -m pytest test
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ Output Message:
```

### Endpoints
#### /fingerprint_item/{fingerprinter}
#### /process_item/{process_name}
This endpoint pushes a message into a queue. It's an async operation, meaning the server will respond before the operation is complete. This is useful when working with slow or unreliable external resources.

Request
```
curl -X POST "http://127.0.0.1:8000/fingerprint_item/sample_fingerprinter" -H "accept: application/json" -H "Content-Type: application/json" -d "{\"message_key\":\"message_value\"}"
curl -X POST "http://127.0.0.1:8000/process_item/mean_tokens__Model" -H "accept: application/json" -H "Content-Type: application/json" -d "{\"message_key\":\"message_value\"}"
```

Replace sample_fingerprinter with the name of your fingerprinter, and message_key and message_value with your actual message data.
Expand Down
23 changes: 16 additions & 7 deletions lib/http.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# from lib import schemas
# from lib.queue.worker import QueueWorker
# queue = QueueWorker.create("mean_tokens__Model")
# queue.push_message("mean_tokens__Model", schemas.Message(body={"callback_url": "http://0.0.0.0:8000/echo", "id": 123, "text": "Some text to vectorize"}))
import json
import datetime
from typing import Any, Dict
import httpx
from httpx import HTTPStatusError
from fastapi import FastAPI, Request
from pydantic import BaseModel
from lib.queue.queue import Queue
from lib.queue.worker import QueueWorker
from lib.logger import logger
from lib import schemas

Expand All @@ -27,14 +31,14 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]:
except HTTPStatusError:
return {"error": f"HTTP Error on Attempt to call {url} with {params}"}

@app.post("/fingerprint_item/{fingerprinter}")
def fingerprint_item(fingerprinter: str, message: Dict[str, Any]):
queue = Queue.create(fingerprinter)
queue.push_message(fingerprinter, schemas.Message(body=message, input_queue=queue.input_queue_name, output_queue=queue.output_queue_name, start_time=str(datetime.datetime.now())))
@app.post("/process_item/{process_name}")
def process_item(process_name: str, message: Dict[str, Any]):
queue = QueueWorker.create(process_name)
queue.push_message(process_name, schemas.Message(body=message))
return {"message": "Message pushed successfully"}

@app.post("/trigger_callback")
async def fingerprint_item(message: Dict[str, Any]):
async def process_item(message: Dict[str, Any]):
url = message.get("callback_url")
if url:
response = await post_url(url, message)
Expand All @@ -46,5 +50,10 @@ async def fingerprint_item(message: Dict[str, Any]):
return {"message": "No Message Callback, Passing"}

@app.get("/ping")
def fingerprint_item():
def process_item():
return {"pong": 1}

@app.post("/echo")
async def echo(message: Dict[str, Any]):
logger.info(f"About to echo message of {message}")
return {"echo": message}
2 changes: 1 addition & 1 deletion lib/model/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def audio_hasher(self, filename: str) -> List[int]:
except acoustid.FingerprintGenerationError:
return []

def fingerprint(self, audio: schemas.Message) -> Dict[str, Union[str, List[int]]]:
def process(self, audio: schemas.Message) -> Dict[str, Union[str, List[int]]]:
temp_file_name = self.get_tempfile_for_url(audio.body.url)
try:
hash_value = self.audio_hasher(temp_file_name)
Expand Down
4 changes: 2 additions & 2 deletions lib/model/generic_transformer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from typing import Union, Dict, List
from sentence_transformers import SentenceTransformer

from lib.logger import logger
from lib.model.model import Model
from lib import schemas

Expand All @@ -21,7 +21,7 @@ def respond(self, docs: Union[List[schemas.Message], schemas.Message]) -> List[s
"""
if not isinstance(docs, list):
docs = [docs]
print(docs)
logger.info(docs)
vectorizable_texts = [e.body.text for e in docs]
vectorized = self.vectorize(vectorizable_texts)
for doc, vector in zip(docs, vectorized):
Expand Down
2 changes: 1 addition & 1 deletion lib/model/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_iobytes_for_image(self, image: schemas.Message) -> io.BytesIO:
).read()
)

def fingerprint(self, image: schemas.Message) -> schemas.ImageOutput:
def process(self, image: schemas.Message) -> schemas.ImageOutput:
"""
Generic function for returning the actual response.
"""
Expand Down
4 changes: 2 additions & 2 deletions lib/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_tempfile(self) -> Any:
"""
return tempfile.NamedTemporaryFile()

def fingerprint(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]:
def process(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]:
return []

def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]:
Expand All @@ -43,7 +43,7 @@ def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> Li
if not isinstance(messages, list):
messages = [messages]
for message in messages:
message.response = self.fingerprint(message)
message.response = self.process(message)
return messages

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion lib/model/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def tmk_bucket(self) -> str:
"""
return "presto_tmk_videos"

def fingerprint(self, video: schemas.Message) -> schemas.VideoOutput:
def process(self, video: schemas.Message) -> schemas.VideoOutput:
"""
Main fingerprinting routine - download video to disk, get short hash,
then calculate larger TMK hash and upload that to S3.
Expand Down
57 changes: 57 additions & 0 deletions lib/queue/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import List
import json

import requests

from lib import schemas
from lib.logger import logger
from lib.helpers import get_setting
from lib.queue.queue import Queue
class QueueProcessor(Queue):
@classmethod
def create(cls, input_queue_name: str = None, batch_size: int = 10):
"""
Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size.
Pulls settings and then inits instance.
"""
input_queue_name = get_setting(input_queue_name, "MODEL_NAME").replace(".", "__")
logger.info(f"Starting queue with: ('{input_queue_name}', {batch_size})")
return QueueProcessor(input_queue_name, batch_size)

def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1):
"""
Start a specific queue - must pass input_queue_name - optionally pass output_queue_name, batch_size.
"""
super().__init__()
self.input_queue_name = input_queue_name
self.input_queues = self.restrict_queues_to_suffix(self.get_or_create_queues(input_queue_name+"_output"), "_output")
self.all_queues = self.store_queue_map(self.input_queues)
logger.info(f"Processor listening to queues of {self.all_queues}")
self.batch_size = batch_size

def send_callbacks(self) -> List[schemas.Message]:
"""
Main routine. Given a model, in a loop, read tasks from input_queue_name at batch_size depth,
pass messages to model to respond (i.e. fingerprint) them, then pass responses to output queue.
If failures happen at any point, resend failed messages to input queue.
"""
messages_with_queues = self.receive_messages(self.batch_size)
if messages_with_queues:
logger.debug(f"About to respond to: ({messages_with_queues})")
bodies = [schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues]
for body in bodies:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in this PR, but we should probably call this in parallel

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosign

self.send_callback(body)
self.delete_messages(messages_with_queues)


def send_callback(self, message):
"""
Rescue against failures when attempting to respond (i.e. fingerprint) from models.
Return responses if no failure.
"""
logger.info(f"Message for callback is: {message}")
try:
callback_url = message.body.callback_url
requests.post(callback_url, json=message.dict())
except Exception as e:
logger.error(f"Callback fail! Failed with {e} on {callback_url} with message of {message}")
79 changes: 17 additions & 62 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,28 @@
import json
from typing import Any, List, Dict, Tuple, Union
from typing import List, Dict, Tuple
import os

import boto3
import botocore

from lib.helpers import get_class, get_setting, get_environment_setting
from lib.model.model import Model
from lib.helpers import get_environment_setting
from lib.logger import logger
from lib import schemas
SQS_MAX_BATCH_SIZE = 10
class Queue:
@classmethod
def create(cls, input_queue_name: str = None, output_queue_name: str = None, batch_size: int = 10):
def __init__(self):
"""
Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size.
Pulls settings and then inits instance.
"""
input_queue_name = get_setting(input_queue_name, "MODEL_NAME").replace(".", "__")
output_queue_name = output_queue_name or f"{input_queue_name}_output"
logger.info(f"Starting queue with: ('{input_queue_name}', '{output_queue_name}', {batch_size})")
return Queue(input_queue_name, output_queue_name, batch_size)

def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1):
"""
Start a specific queue - must pass input_queue_name - optionally pass output_queue_name, batch_size.
Start a specific queue - must pass input_queue_name.
"""
self.sqs = self.get_sqs()
self.input_queue_name = input_queue_name
self.input_queues = self.restrict_queues_by_suffix(self.get_or_create_queues(input_queue_name), "_output")
if output_queue_name:
self.output_queue_name = self.get_output_queue_name(input_queue_name, output_queue_name)
self.output_queues = self.get_or_create_queues(output_queue_name)
self.all_queues = self.store_queue_map()
self.batch_size = batch_size


def store_queue_map(self) -> Dict[str, boto3.resources.base.ServiceResource]:
def store_queue_map(self, all_queues: List[boto3.resources.base.ServiceResource]) -> Dict[str, boto3.resources.base.ServiceResource]:
"""
Store a quick lookup so that we dont loop through this over and over in other places.
"""
queue_map = {}
for group in [self.input_queues, self.output_queues]:
for q in group:
queue_map[self.queue_name(q)] = q
for queue in all_queues:
queue_map[self.queue_name(queue)] = queue
return queue_map

def queue_name(self, queue: boto3.resources.base.ServiceResource) -> str:
Expand All @@ -51,6 +31,12 @@ def queue_name(self, queue: boto3.resources.base.ServiceResource) -> str:
"""
return queue.url.split('/')[-1]

def restrict_queues_to_suffix(self, queues: List[boto3.resources.base.ServiceResource], suffix: str) -> List[boto3.resources.base.ServiceResource]:
"""
When plucking input queues, we want to omit any queues that are our paired suffix queues..
"""
return [queue for queue in queues if self.queue_name(queue).endswith(suffix)]

def restrict_queues_by_suffix(self, queues: List[boto3.resources.base.ServiceResource], suffix: str) -> List[boto3.resources.base.ServiceResource]:
"""
When plucking input queues, we want to omit any queues that are our paired suffix queues..
Expand Down Expand Up @@ -101,7 +87,7 @@ def get_output_queue_name(self, input_queue_name: str, output_queue_name: str =
If output_queue_name was empty or None, set name for queue.
"""
if not output_queue_name:
output_queue_name = f'{input_queue_name}-output'
output_queue_name = f'{input_queue_name}_output'
return output_queue_name

def group_deletions(self, messages_with_queues: List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]) -> Dict[boto3.resources.base.ServiceResource, List[schemas.Message]]:
Expand All @@ -115,7 +101,7 @@ def group_deletions(self, messages_with_queues: List[Tuple[schemas.Message, boto
queue_to_messages[queue].append(message)
return queue_to_messages

def delete_messages(self, messages_with_queues: List[Tuple[Dict[str, Any], boto3.resources.base.ServiceResource]]) -> None:
def delete_messages(self, messages_with_queues: List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]) -> None:
"""
Delete messages as we process them so other processes don't pick them up.
SQS deals in max batches of 10, so break up messages into groups of 10
Expand All @@ -141,31 +127,6 @@ def delete_messages_from_queue(self, queue: boto3.resources.base.ServiceResource
entries.append(entry)
queue.delete_messages(Entries=entries)

def safely_respond(self, model: Model) -> List[schemas.Message]:
"""
Rescue against failures when attempting to respond (i.e. fingerprint) from models.
Return responses if no failure.
"""
messages_with_queues = self.receive_messages(model.BATCH_SIZE)
responses = []
if messages_with_queues:
logger.debug(f"About to respond to: ({messages_with_queues})")
responses = model.respond([schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues])
self.delete_messages(messages_with_queues)
return responses

def fingerprint(self, model: Model):
"""
Main routine. Given a model, in a loop, read tasks from input_queue_name at batch_size depth,
pass messages to model to respond (i.e. fingerprint) them, then pass responses to output queue.
If failures happen at any point, resend failed messages to input queue.
"""
responses = self.safely_respond(model)
if responses:
for response in responses:
logger.info(f"Processing message of: ({response})")
self.return_response(response)

def receive_messages(self, batch_size: int = 1) -> List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]:
"""
Pull batch_size messages from input queue.
Expand All @@ -175,19 +136,13 @@ def receive_messages(self, batch_size: int = 1) -> List[Tuple[schemas.Message, b
for queue in self.input_queues:
if batch_size <= 0:
break
batch_messages = queue.receive_messages(MaxNumberOfMessages=min(batch_size, self.batch_size))
batch_messages = queue.receive_messages(MaxNumberOfMessages=min(batch_size, SQS_MAX_BATCH_SIZE))
for message in batch_messages:
if batch_size > 0:
messages_with_queues.append((message, queue))
batch_size -= 1
return messages_with_queues

def return_response(self, message: schemas.Message):
"""
Send message to output queue
"""
return self.push_message(self.output_queue_name, message)

def find_queue_by_name(self, queue_name: str) -> boto3.resources.base.ServiceResource:
"""
Search through queues to find the right one
Expand Down
Loading
Loading