Skip to content

Commit

Permalink
Merge pull request #75 from meedan/feb2924-revert
Browse files Browse the repository at this point in the history
Revert everything back to Feb 20
  • Loading branch information
DGaffney authored Mar 1, 2024
2 parents 3988bbd + 4f79c5f commit 341263b
Show file tree
Hide file tree
Showing 30 changed files with 122 additions and 98 deletions.
6 changes: 2 additions & 4 deletions .env_file
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
QUEUE_PREFIX=dev_
PRESTO_PORT=8000
DEPLOY_ENV=local
MODEL_NAME=mean_tokens.Model
# MODEL_NAME=audio.Model
# MODEL_NAME=mean_tokens.Model
MODEL_NAME=audio.Model
S3_ENDPOINT=http://minio:9000
AWS_DEFAULT_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
NUM_WORKERS=1
#QUEUE_SUFFIX=.fifo
4 changes: 2 additions & 2 deletions .env_file.test
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
QUEUE_PREFIX=dev_
PRESTO_PORT=8000
DEPLOY_ENV=local
MODEL_NAME=mean_tokens.Model
#MODEL_NAME=audio.Model
# MODEL_NAME=mean_tokens.Model
MODEL_NAME=audio.Model
S3_ENDPOINT=http://minio:9000
AWS_DEFAULT_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
Expand Down
74 changes: 59 additions & 15 deletions .github/workflows/ci-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ on:
push:
branches:
- 'master'
tags:
- 'v*'

permissions:
id-token: write
Expand Down Expand Up @@ -81,27 +79,73 @@ jobs:
- name: Kick off Terraform deploy in sysops/
id: sysops-deploy
if: github.event_name == 'push' && startsWith(github.ref, 'refs/heads/master')
run: |
curl \
-X POST \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${{ secrets.SYSOPS_RW_GITHUB_TOKEN }}" \
-H "X-GitHub-Api-Version: 2022-11-28" \
https://api.github.com/repos/meedan/sysops/actions/workflows/deploy_${{ github.event.repository.name }}.yml/dispatches \
-d '{"ref": "master", "inputs": {"git_sha": "${{ github.sha }}", "type": "push"}}'
-d '{"ref": "master", "inputs": {"git_sha": "${{ github.sha }}"}}'
- name: Kick off Terraform deploy in sysops/
id: sysops-deploy-live
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v')
run: |
curl \
-X POST \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${{ secrets.SYSOPS_RW_GITHUB_TOKEN }}" \
-H "X-GitHub-Api-Version: 2022-11-28" \
https://api.github.com/repos/meedan/sysops/actions/workflows/deploy_${{ github.event.repository.name }}.yml/dispatches \
-d '{"ref": "master", "inputs": {"git_sha": "${{ github.sha }}", "type": "tag"}}'
- name: Send GitHub Action trigger data to Slack workflow on success
id: slack-api-notify-success
if: ${{ success() }}
uses: slackapi/[email protected]
with:
payload: |
{
"attachments": [
{
"color": "#00FF00",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Kicked off by: ${{ github.triggering_actor }}\nWorkflow: https://github.com/meedan/presto/actions/runs/${{ github.run_id }}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Presto Deploy:\n${{ github.event.pull_request.html_url || github.event.head_commit.url }}"
}
}
]
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.CHECK_DEV_BOTS_SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

- name: Send GitHub Action trigger data to Slack workflow on failure
id: slack-api-notify-failure
if: ${{ failure() }}
uses: slackapi/[email protected]
with:
payload: |
{
"attachments": [
{
"color": "#FF0000",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Presto Deploy failed\nWorkflow: https://github.com/meedan/presto/actions/runs/${{ github.run_id }}"
}
}
]
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.ITS_BOTS_SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

- name: Reset cache
id: reset-cache
Expand Down
3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ services:
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
- S3_ENDPOINT=${S3_ENDPOINT}
- QUEUE_SUFFIX=${QUEUE_SUFFIX}
env_file:
- ./.env_file
depends_on:
Expand All @@ -24,4 +23,4 @@ services:
image: softwaremill/elasticmq
hostname: presto-elasticmq
ports:
- "9324:9324"
- "9324:9324"
3 changes: 1 addition & 2 deletions lib/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def get_environment_setting(os_key: str) -> str:
"""
Get environment variable helper. Could be augmented with credential store if/when necessary.
"""
return os.environ.get(os_key, "") or ""
return os.environ.get(os_key)

def get_setting(current_value: Any, default_os_key: str) -> Any:
"""
Expand All @@ -22,4 +22,3 @@ def get_class(prefix: str, class_name: str) -> Any:
module = prefix+str.join(".", class_name.split('.')[:-1])
module_obj = importlib.import_module(module)
return getattr(module_obj, class_name.split('.')[-1])

3 changes: 1 addition & 2 deletions lib/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]:
def process_item(process_name: str, message: Dict[str, Any]):
logger.info(message)
queue_prefix = Queue.get_queue_prefix()
queue_suffix = Queue.get_queue_suffix()
queue = QueueWorker.create(process_name)
queue.push_message(f"{queue_prefix}{process_name}{queue_suffix}", schemas.Message(body=message, model_name=process_name))
queue.push_message(f"{queue_prefix}{process_name}", schemas.Message(body=message, model_name=process_name))
return {"message": "Message pushed successfully", "queue": process_name, "body": message}

@app.post("/trigger_callback")
Expand Down
2 changes: 1 addition & 1 deletion lib/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
logging.config.dictConfig(LOGGING_CONFIG)

# This provides an easily accessible logger for other modules
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
2 changes: 1 addition & 1 deletion lib/model/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ def process(self, audio: schemas.Message) -> Dict[str, Union[str, List[int]]]:
hash_value = self.audio_hasher(temp_file_name)
finally:
os.remove(temp_file_name)
return hash_value
return hash_value
2 changes: 1 addition & 1 deletion lib/model/fasttext.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ def respond(self, docs: Union[List[schemas.Message], schemas.Message]) -> List[s

for doc, detected_lang in zip(docs, detected_langs):
doc.body.hash_value = detected_lang
return docs
return docs
2 changes: 1 addition & 1 deletion lib/model/fptg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ def __init__(self):
"""
Init FPTG model. Fairly standard for all vectorizers.
"""
super().__init__(MODEL_NAME)
super().__init__(MODEL_NAME)
2 changes: 1 addition & 1 deletion lib/model/generic_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ def vectorize(self, texts: List[str]) -> List[List[float]]:
"""
Vectorize the text! Run as batch.
"""
return self.model.encode(texts).tolist()
return self.model.encode(texts).tolist()
2 changes: 1 addition & 1 deletion lib/model/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ def process(self, image: schemas.Message) -> schemas.GenericItem:
"""
Generic function for returning the actual response.
"""
return self.compute_pdq(self.get_iobytes_for_image(image))
return self.compute_pdq(self.get_iobytes_for_image(image))
2 changes: 1 addition & 1 deletion lib/model/indian_sbert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ def __init__(self):
"""
Init IndianSbert model. Fairly standard for all vectorizers.
"""
super().__init__(MODEL_NAME)
super().__init__(MODEL_NAME)
2 changes: 1 addition & 1 deletion lib/model/mean_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ def __init__(self):
"""
Init MeanTokens model. Fairly standard for all vectorizers.
"""
super().__init__(MODEL_NAME)
super().__init__(MODEL_NAME)
2 changes: 1 addition & 1 deletion lib/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ def create(cls):
abstraction for loading model based on os environment-specified model.
"""
model = get_class('lib.model.', os.environ.get('MODEL_NAME'))
return model()
return model()
2 changes: 1 addition & 1 deletion lib/model/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ def process(self, video: schemas.Message) -> schemas.GenericItem:
for file_path in [self.tmk_file_path(video_filename), temp_file_name]:
if os.path.exists(file_path):
os.remove(file_path)
return {"folder": self.tmk_bucket(), "filepath": self.tmk_file_path(video_filename), "hash_value": hash_value}
return {"folder": self.tmk_bucket(), "filepath": self.tmk_file_path(video_filename), "hash_value": hash_value}
9 changes: 5 additions & 4 deletions lib/queue/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

from lib import schemas
from lib.logger import logger
from lib.helpers import get_setting
from lib.queue.queue import Queue
class QueueProcessor(Queue):
@classmethod
def create(cls, model_name: str = None, batch_size: int = 10):
def create(cls, input_queue_name: str = None, batch_size: int = 10):
"""
Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size.
Pulls settings and then inits instance.
"""
input_queue_name = Queue.get_output_queue_name(model_name)
input_queue_name = Queue.get_queue_name(input_queue_name)
logger.info(f"Starting queue with: ('{input_queue_name}', {batch_size})")
return QueueProcessor(input_queue_name, batch_size)

Expand All @@ -23,7 +24,7 @@ def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_s
"""
super().__init__()
self.input_queue_name = input_queue_name
self.input_queues = self.restrict_queues_to_suffix(self.get_or_create_queues(input_queue_name), Queue.get_queue_suffix())
self.input_queues = self.restrict_queues_to_suffix(self.get_or_create_queues(input_queue_name+"_output"), "_output")
self.all_queues = self.store_queue_map(self.input_queues)
logger.info(f"Processor listening to queues of {self.all_queues}")
self.batch_size = batch_size
Expand Down Expand Up @@ -52,4 +53,4 @@ def send_callback(self, message):
callback_url = message.body.callback_url
requests.post(callback_url, json=message.dict())
except Exception as e:
logger.error(f"Callback fail! Failed with {e} on {callback_url} with message of {message}")
logger.error(f"Callback fail! Failed with {e} on {callback_url} with message of {message}")
46 changes: 15 additions & 31 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import boto3
import botocore

from lib.helpers import get_environment_setting
from lib.helpers import get_setting, get_environment_setting
from lib.logger import logger
from lib import schemas
SQS_MAX_BATCH_SIZE = 10
Expand All @@ -21,18 +21,8 @@ def get_queue_prefix():
return (get_environment_setting("QUEUE_PREFIX") or "").replace(".", "__")

@staticmethod
def get_queue_suffix():
return (get_environment_setting("QUEUE_SUFFIX") or "")

@staticmethod
def get_input_queue_name(model_name=None):
name = model_name or get_environment_setting("MODEL_NAME").replace(".", "__")
return Queue.get_queue_prefix()+name+Queue.get_queue_suffix()

@staticmethod
def get_output_queue_name(model_name=None):
name = model_name or get_environment_setting("MODEL_NAME").replace(".", "__")
return Queue.get_queue_prefix()+name+"_output"+Queue.get_queue_suffix()
def get_queue_name(input_queue_name):
return Queue.get_queue_prefix()+get_setting(input_queue_name, "MODEL_NAME").replace(".", "__")

def store_queue_map(self, all_queues: List[boto3.resources.base.ServiceResource]) -> Dict[str, boto3.resources.base.ServiceResource]:
"""
Expand All @@ -53,7 +43,7 @@ def restrict_queues_to_suffix(self, queues: List[boto3.resources.base.ServiceRes
"""
When plucking input queues, we want to omit any queues that are our paired suffix queues..
"""
return [queue for queue in queues if suffix and self.queue_name(queue).endswith(suffix)]
return [queue for queue in queues if self.queue_name(queue).endswith(suffix)]

def restrict_queues_by_suffix(self, queues: List[boto3.resources.base.ServiceResource], suffix: str) -> List[boto3.resources.base.ServiceResource]:
"""
Expand All @@ -66,16 +56,7 @@ def create_queue(self, queue_name: str) -> boto3.resources.base.ServiceResource:
Create queue by name - may not work in production owing to permissions - mostly a local convenience function
"""
logger.info(f"Queue {queue_name} doesn't exist - creating")
attributes = {}
if queue_name.endswith('.fifo'):
attributes['FifoQueue'] = 'true'
# Optionally enable content-based deduplication for FIFO queues
attributes['ContentBasedDeduplication'] = 'true'
# Include other FIFO-specific attributes as needed
return self.sqs.create_queue(
QueueName=queue_name,
Attributes=attributes
)
return self.sqs.create_queue(QueueName=queue_name)

def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.ServiceResource]:
"""
Expand All @@ -84,8 +65,6 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser
try:
found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)]
exact_match_queues = [queue for queue in found_queues if queue.attributes['QueueArn'].split(':')[-1] == queue_name]
logger.info(f"found queues are {found_queues}")
logger.info(f"exact queues are {exact_match_queues}")
if exact_match_queues:
return exact_match_queues
else:
Expand All @@ -112,6 +91,14 @@ def get_sqs(self) -> boto3.resources.base.ServiceResource:
logger.info(f"Using SQS Interface")
return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION"))

def get_output_queue_name(self, input_queue_name: str, output_queue_name: str = None) -> str:
"""
If output_queue_name was empty or None, set name for queue.
"""
if not output_queue_name:
output_queue_name = f'{input_queue_name}_output'
return output_queue_name

def group_deletions(self, messages_with_queues: List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]) -> Dict[boto3.resources.base.ServiceResource, List[schemas.Message]]:
"""
Group deletions so that we can run through a simplified set of batches rather than delete each item independently
Expand Down Expand Up @@ -175,8 +162,5 @@ def push_message(self, queue_name: str, message: schemas.Message) -> schemas.Mes
"""
Actual SQS logic for pushing a message to a queue
"""
message_data = {"MessageBody": json.dumps(message.dict())}
if queue_name.endswith('.fifo'):
message_data["MessageGroupId"] = message.body.id
self.find_queue_by_name(queue_name).send_message(**message_data)
return message
self.find_queue_by_name(queue_name).send_message(MessageBody=json.dumps(message.dict()))
return message
14 changes: 7 additions & 7 deletions lib/queue/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
from lib.logger import logger
from lib.queue.queue import Queue
from lib.model.model import Model
from lib.helpers import get_setting
TIMEOUT_SECONDS = int(os.getenv("WORK_TIMEOUT_SECONDS", "60"))
class QueueWorker(Queue):
@classmethod
def create(cls, model_name: str = None):
def create(cls, input_queue_name: str = None):
"""
Instantiate a queue worker. Must pass input_queue_name.
Pulls settings and then inits instance.
"""
input_queue_name = Queue.get_input_queue_name(model_name)
output_queue_name = Queue.get_output_queue_name(model_name)
input_queue_name = Queue.get_queue_name(input_queue_name)
output_queue_name = f"{input_queue_name}_output"
logger.info(f"Starting queue with: ('{input_queue_name}', '{output_queue_name}')")
return QueueWorker(input_queue_name, output_queue_name)

Expand All @@ -25,10 +26,9 @@ def __init__(self, input_queue_name: str, output_queue_name: str = None):
"""
super().__init__()
self.input_queue_name = input_queue_name
q_suffix = f"_output" + Queue.get_queue_suffix()
self.input_queues = self.restrict_queues_by_suffix(self.get_or_create_queues(input_queue_name), q_suffix)
self.input_queues = self.restrict_queues_by_suffix(self.get_or_create_queues(input_queue_name), "_output")
if output_queue_name:
self.output_queue_name = Queue.get_output_queue_name()
self.output_queue_name = self.get_output_queue_name(input_queue_name, output_queue_name)
self.output_queues = self.get_or_create_queues(output_queue_name)
self.all_queues = self.store_queue_map([item for row in [self.input_queues, self.output_queues] for item in row])
logger.info(f"Worker listening to queues of {self.all_queues}")
Expand Down Expand Up @@ -116,4 +116,4 @@ def delete_processed_messages(self, messages_with_queues: List[Tuple]):
Parameters:
- messages_with_queues (List[Tuple]): A list of tuples, each containing a message and its corresponding queue, to be deleted.
"""
self.delete_messages(messages_with_queues)
self.delete_messages(messages_with_queues)
Loading

0 comments on commit 341263b

Please sign in to comment.