Skip to content

Commit

Permalink
Merge pull request #45 from meedan/cv2-3852-fix-queue-prefix-logic
Browse files Browse the repository at this point in the history
CV2-3852 fix queue prefix logic
  • Loading branch information
DGaffney authored Oct 16, 2023
2 parents 12c93b2 + ed0d2f8 commit 17d0357
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ version: '3.9'
services:
app:
platform: linux/amd64
build: .
build:
context: .
args:
- PRESTO_PORT=${PRESTO_PORT}
env_file:
- ./.env_file
depends_on:
Expand All @@ -12,9 +15,6 @@ services:
- elasticmq
volumes:
- ./:/app
- "${PRESTO_PORT}:${PRESTO_PORT}"
args:
- PRESTO_PORT=${PRESTO_PORT}
elasticmq:
image: softwaremill/elasticmq
hostname: presto-elasticmq
Expand Down
3 changes: 2 additions & 1 deletion lib/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import FastAPI, Request
from pydantic import BaseModel
from lib.queue.worker import QueueWorker
from lib.queue.queue import Queue
from lib.logger import logger
from lib import schemas
from lib.sentry import sentry_sdk
Expand All @@ -32,7 +33,7 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]:
@app.post("/process_item/{process_name}")
def process_item(process_name: str, message: Dict[str, Any]):
logger.info(message)
queue_prefix = get_setting("", "QUEUE_PREFIX").replace(".", "__")
queue_prefix = Queue.get_queue_prefix()
queue = QueueWorker.create(process_name)
queue.push_message(f"{queue_prefix}{process_name}", schemas.Message(body=message))
return {"message": "Message pushed successfully", "queue": process_name, "body": message}
Expand Down
3 changes: 1 addition & 2 deletions lib/queue/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ def create(cls, input_queue_name: str = None, batch_size: int = 10):
Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size.
Pulls settings and then inits instance.
"""
queue_prefix = get_setting("", "QUEUE_PREFIX").replace(".", "__")
input_queue_name = queue_prefix+get_setting(input_queue_name, "MODEL_NAME").replace(".", "__")
input_queue_name = Queue.get_queue_name(input_queue_name)
logger.info(f"Starting queue with: ('{input_queue_name}', {batch_size})")
return QueueProcessor(input_queue_name, batch_size)

Expand Down
10 changes: 9 additions & 1 deletion lib/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import boto3
import botocore

from lib.helpers import get_environment_setting
from lib.helpers import get_setting, get_environment_setting
from lib.logger import logger
from lib import schemas
SQS_MAX_BATCH_SIZE = 10
Expand All @@ -16,6 +16,14 @@ def __init__(self):
"""
self.sqs = self.get_sqs()

@staticmethod
def get_queue_prefix():
return (get_environment_setting("QUEUE_PREFIX") or "").replace(".", "__")

@staticmethod
def get_queue_name(input_queue_name):
return Queue.get_queue_prefix()+get_setting(input_queue_name, "MODEL_NAME").replace(".", "__")

def store_queue_map(self, all_queues: List[boto3.resources.base.ServiceResource]) -> Dict[str, boto3.resources.base.ServiceResource]:
"""
Store a quick lookup so that we dont loop through this over and over in other places.
Expand Down
3 changes: 1 addition & 2 deletions lib/queue/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ def create(cls, input_queue_name: str = None):
Instantiate a queue worker. Must pass input_queue_name.
Pulls settings and then inits instance.
"""
queue_prefix = get_setting("", "QUEUE_PREFIX").replace(".", "__")
input_queue_name = queue_prefix+get_setting(input_queue_name, "MODEL_NAME").replace(".", "__")
input_queue_name = Queue.get_queue_name(input_queue_name)
output_queue_name = f"{input_queue_name}_output"
logger.info(f"Starting queue with: ('{input_queue_name}', '{output_queue_name}')")
return QueueWorker(input_queue_name, output_queue_name)
Expand Down

0 comments on commit 17d0357

Please sign in to comment.