Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Merge pull request #75 from meedan/feb2924-revert" #76

Merged
merged 2 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .env_file
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
QUEUE_PREFIX=dev_
PRESTO_PORT=8000
DEPLOY_ENV=local
# MODEL_NAME=mean_tokens.Model
MODEL_NAME=audio.Model
MODEL_NAME=mean_tokens.Model
# MODEL_NAME=audio.Model
S3_ENDPOINT=http://minio:9000
AWS_DEFAULT_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
NUM_WORKERS=1
#QUEUE_SUFFIX=.fifo
4 changes: 2 additions & 2 deletions .env_file.test
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
QUEUE_PREFIX=dev_
PRESTO_PORT=8000
DEPLOY_ENV=local
# MODEL_NAME=mean_tokens.Model
MODEL_NAME=audio.Model
MODEL_NAME=mean_tokens.Model
#MODEL_NAME=audio.Model
S3_ENDPOINT=http://minio:9000
AWS_DEFAULT_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
Expand Down
74 changes: 15 additions & 59 deletions .github/workflows/ci-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on:
push:
branches:
- 'master'
tags:
- 'v*'

permissions:
id-token: write
Expand Down Expand Up @@ -79,73 +81,27 @@ jobs:
- name: Kick off Terraform deploy in sysops/
id: sysops-deploy
if: github.event_name == 'push' && startsWith(github.ref, 'refs/heads/master')
run: |
curl \
-X POST \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${{ secrets.SYSOPS_RW_GITHUB_TOKEN }}" \
-H "X-GitHub-Api-Version: 2022-11-28" \
https://api.github.com/repos/meedan/sysops/actions/workflows/deploy_${{ github.event.repository.name }}.yml/dispatches \
-d '{"ref": "master", "inputs": {"git_sha": "${{ github.sha }}"}}'
-d '{"ref": "master", "inputs": {"git_sha": "${{ github.sha }}", "type": "push"}}'
- name: Send GitHub Action trigger data to Slack workflow on success
id: slack-api-notify-success
if: ${{ success() }}
uses: slackapi/[email protected]
with:
payload: |
{
"attachments": [
{
"color": "#00FF00",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Kicked off by: ${{ github.triggering_actor }}\nWorkflow: https://github.com/meedan/presto/actions/runs/${{ github.run_id }}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Presto Deploy:\n${{ github.event.pull_request.html_url || github.event.head_commit.url }}"
}
}
]
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.CHECK_DEV_BOTS_SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

- name: Send GitHub Action trigger data to Slack workflow on failure
id: slack-api-notify-failure
if: ${{ failure() }}
uses: slackapi/[email protected]
with:
payload: |
{
"attachments": [
{
"color": "#FF0000",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Presto Deploy failed\nWorkflow: https://github.com/meedan/presto/actions/runs/${{ github.run_id }}"
}
}
]
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.ITS_BOTS_SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK
- name: Kick off Terraform deploy in sysops/
id: sysops-deploy-live
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v')
run: |
curl \
-X POST \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${{ secrets.SYSOPS_RW_GITHUB_TOKEN }}" \
-H "X-GitHub-Api-Version: 2022-11-28" \
https://api.github.com/repos/meedan/sysops/actions/workflows/deploy_${{ github.event.repository.name }}.yml/dispatches \
-d '{"ref": "master", "inputs": {"git_sha": "${{ github.sha }}", "type": "tag"}}'
- name: Reset cache
id: reset-cache
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
- S3_ENDPOINT=${S3_ENDPOINT}
- QUEUE_SUFFIX=${QUEUE_SUFFIX}
env_file:
- ./.env_file
depends_on:
Expand All @@ -23,4 +24,4 @@ services:
image: softwaremill/elasticmq
hostname: presto-elasticmq
ports:
- "9324:9324"
- "9324:9324"
3 changes: 2 additions & 1 deletion lib/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def get_environment_setting(os_key: str) -> str:
"""
Get environment variable helper. Could be augmented with credential store if/when necessary.
"""
return os.environ.get(os_key)
return os.environ.get(os_key, "") or ""

def get_setting(current_value: Any, default_os_key: str) -> Any:
"""
Expand All @@ -22,3 +22,4 @@ def get_class(prefix: str, class_name: str) -> Any:
module = prefix+str.join(".", class_name.split('.')[:-1])
module_obj = importlib.import_module(module)
return getattr(module_obj, class_name.split('.')[-1])

3 changes: 2 additions & 1 deletion lib/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ async def post_url(url: str, params: dict) -> Dict[str, Any]:
def process_item(process_name: str, message: Dict[str, Any]):
logger.info(message)
queue_prefix = Queue.get_queue_prefix()
queue_suffix = Queue.get_queue_suffix()
queue = QueueWorker.create(process_name)
queue.push_message(f"{queue_prefix}{process_name}", schemas.Message(body=message, model_name=process_name))
queue.push_message(f"{queue_prefix}{process_name}{queue_suffix}", schemas.Message(body=message, model_name=process_name))
return {"message": "Message pushed successfully", "queue": process_name, "body": message}

@app.post("/trigger_callback")
Expand Down
2 changes: 1 addition & 1 deletion lib/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
logging.config.dictConfig(LOGGING_CONFIG)

# This provides an easily accessible logger for other modules
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
2 changes: 1 addition & 1 deletion lib/model/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ def process(self, audio: schemas.Message) -> Dict[str, Union[str, List[int]]]:
hash_value = self.audio_hasher(temp_file_name)
finally:
os.remove(temp_file_name)
return hash_value
return hash_value
2 changes: 1 addition & 1 deletion lib/model/fasttext.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ def respond(self, docs: Union[List[schemas.Message], schemas.Message]) -> List[s

for doc, detected_lang in zip(docs, detected_langs):
doc.body.hash_value = detected_lang
return docs
return docs
2 changes: 1 addition & 1 deletion lib/model/fptg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ def __init__(self):
"""
Init FPTG model. Fairly standard for all vectorizers.
"""
super().__init__(MODEL_NAME)
super().__init__(MODEL_NAME)
2 changes: 1 addition & 1 deletion lib/model/generic_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ def vectorize(self, texts: List[str]) -> List[List[float]]:
"""
Vectorize the text! Run as batch.
"""
return self.model.encode(texts).tolist()
return self.model.encode(texts).tolist()
2 changes: 1 addition & 1 deletion lib/model/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ def process(self, image: schemas.Message) -> schemas.GenericItem:
"""
Generic function for returning the actual response.
"""
return self.compute_pdq(self.get_iobytes_for_image(image))
return self.compute_pdq(self.get_iobytes_for_image(image))
2 changes: 1 addition & 1 deletion lib/model/indian_sbert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ def __init__(self):
"""
Init IndianSbert model. Fairly standard for all vectorizers.
"""
super().__init__(MODEL_NAME)
super().__init__(MODEL_NAME)
2 changes: 1 addition & 1 deletion lib/model/mean_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ def __init__(self):
"""
Init MeanTokens model. Fairly standard for all vectorizers.
"""
super().__init__(MODEL_NAME)
super().__init__(MODEL_NAME)
2 changes: 1 addition & 1 deletion lib/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ def create(cls):
abstraction for loading model based on os environment-specified model.
"""
model = get_class('lib.model.', os.environ.get('MODEL_NAME'))
return model()
return model()
2 changes: 1 addition & 1 deletion lib/model/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ def process(self, video: schemas.Message) -> schemas.GenericItem:
for file_path in [self.tmk_file_path(video_filename), temp_file_name]:
if os.path.exists(file_path):
os.remove(file_path)
return {"folder": self.tmk_bucket(), "filepath": self.tmk_file_path(video_filename), "hash_value": hash_value}
return {"folder": self.tmk_bucket(), "filepath": self.tmk_file_path(video_filename), "hash_value": hash_value}
9 changes: 4 additions & 5 deletions lib/queue/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@

from lib import schemas
from lib.logger import logger
from lib.helpers import get_setting
from lib.queue.queue import Queue
class QueueProcessor(Queue):
@classmethod
def create(cls, input_queue_name: str = None, batch_size: int = 10):
def create(cls, model_name: str = None, batch_size: int = 10):
"""
Instantiate a queue. Must pass input_queue_name, output_queue_name, and batch_size.
Pulls settings and then inits instance.
"""
input_queue_name = Queue.get_queue_name(input_queue_name)
input_queue_name = Queue.get_output_queue_name(model_name)
logger.info(f"Starting queue with: ('{input_queue_name}', {batch_size})")
return QueueProcessor(input_queue_name, batch_size)

Expand All @@ -24,7 +23,7 @@ def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_s
"""
super().__init__()
self.input_queue_name = input_queue_name
self.input_queues = self.restrict_queues_to_suffix(self.get_or_create_queues(input_queue_name+"_output"), "_output")
self.input_queues = self.restrict_queues_to_suffix(self.get_or_create_queues(input_queue_name), Queue.get_queue_suffix())
self.all_queues = self.store_queue_map(self.input_queues)
logger.info(f"Processor listening to queues of {self.all_queues}")
self.batch_size = batch_size
Expand Down Expand Up @@ -53,4 +52,4 @@ def send_callback(self, message):
callback_url = message.body.callback_url
requests.post(callback_url, json=message.dict())
except Exception as e:
logger.error(f"Callback fail! Failed with {e} on {callback_url} with message of {message}")
logger.error(f"Callback fail! Failed with {e} on {callback_url} with message of {message}")
46 changes: 31 additions & 15 deletions lib/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import boto3
import botocore

from lib.helpers import get_setting, get_environment_setting
from lib.helpers import get_environment_setting
from lib.logger import logger
from lib import schemas
SQS_MAX_BATCH_SIZE = 10
Expand All @@ -21,8 +21,18 @@ def get_queue_prefix():
return (get_environment_setting("QUEUE_PREFIX") or "").replace(".", "__")

@staticmethod
def get_queue_name(input_queue_name):
return Queue.get_queue_prefix()+get_setting(input_queue_name, "MODEL_NAME").replace(".", "__")
def get_queue_suffix():
return (get_environment_setting("QUEUE_SUFFIX") or "")

@staticmethod
def get_input_queue_name(model_name=None):
name = model_name or get_environment_setting("MODEL_NAME").replace(".", "__")
return Queue.get_queue_prefix()+name+Queue.get_queue_suffix()

@staticmethod
def get_output_queue_name(model_name=None):
name = model_name or get_environment_setting("MODEL_NAME").replace(".", "__")
return Queue.get_queue_prefix()+name+"_output"+Queue.get_queue_suffix()

def store_queue_map(self, all_queues: List[boto3.resources.base.ServiceResource]) -> Dict[str, boto3.resources.base.ServiceResource]:
"""
Expand All @@ -43,7 +53,7 @@ def restrict_queues_to_suffix(self, queues: List[boto3.resources.base.ServiceRes
"""
When plucking input queues, we want to omit any queues that are our paired suffix queues..
"""
return [queue for queue in queues if self.queue_name(queue).endswith(suffix)]
return [queue for queue in queues if not suffix or suffix and self.queue_name(queue).endswith(suffix)]

def restrict_queues_by_suffix(self, queues: List[boto3.resources.base.ServiceResource], suffix: str) -> List[boto3.resources.base.ServiceResource]:
"""
Expand All @@ -56,7 +66,16 @@ def create_queue(self, queue_name: str) -> boto3.resources.base.ServiceResource:
Create queue by name - may not work in production owing to permissions - mostly a local convenience function
"""
logger.info(f"Queue {queue_name} doesn't exist - creating")
return self.sqs.create_queue(QueueName=queue_name)
attributes = {}
if queue_name.endswith('.fifo'):
attributes['FifoQueue'] = 'true'
# Optionally enable content-based deduplication for FIFO queues
attributes['ContentBasedDeduplication'] = 'true'
# Include other FIFO-specific attributes as needed
return self.sqs.create_queue(
QueueName=queue_name,
Attributes=attributes
)

def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.ServiceResource]:
"""
Expand All @@ -65,6 +84,8 @@ def get_or_create_queues(self, queue_name: str) -> List[boto3.resources.base.Ser
try:
found_queues = [q for q in self.sqs.queues.filter(QueueNamePrefix=queue_name)]
exact_match_queues = [queue for queue in found_queues if queue.attributes['QueueArn'].split(':')[-1] == queue_name]
logger.info(f"found queues are {found_queues}")
logger.info(f"exact queues are {exact_match_queues}")
if exact_match_queues:
return exact_match_queues
else:
Expand All @@ -91,14 +112,6 @@ def get_sqs(self) -> boto3.resources.base.ServiceResource:
logger.info(f"Using SQS Interface")
return boto3.resource('sqs', region_name=get_environment_setting("AWS_DEFAULT_REGION"))

def get_output_queue_name(self, input_queue_name: str, output_queue_name: str = None) -> str:
"""
If output_queue_name was empty or None, set name for queue.
"""
if not output_queue_name:
output_queue_name = f'{input_queue_name}_output'
return output_queue_name

def group_deletions(self, messages_with_queues: List[Tuple[schemas.Message, boto3.resources.base.ServiceResource]]) -> Dict[boto3.resources.base.ServiceResource, List[schemas.Message]]:
"""
Group deletions so that we can run through a simplified set of batches rather than delete each item independently
Expand Down Expand Up @@ -162,5 +175,8 @@ def push_message(self, queue_name: str, message: schemas.Message) -> schemas.Mes
"""
Actual SQS logic for pushing a message to a queue
"""
self.find_queue_by_name(queue_name).send_message(MessageBody=json.dumps(message.dict()))
return message
message_data = {"MessageBody": json.dumps(message.dict())}
if queue_name.endswith('.fifo'):
message_data["MessageGroupId"] = message.body.id
self.find_queue_by_name(queue_name).send_message(**message_data)
return message
14 changes: 7 additions & 7 deletions lib/queue/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@
from lib.logger import logger
from lib.queue.queue import Queue
from lib.model.model import Model
from lib.helpers import get_setting
TIMEOUT_SECONDS = int(os.getenv("WORK_TIMEOUT_SECONDS", "60"))
class QueueWorker(Queue):
@classmethod
def create(cls, input_queue_name: str = None):
def create(cls, model_name: str = None):
"""
Instantiate a queue worker. Must pass input_queue_name.
Pulls settings and then inits instance.
"""
input_queue_name = Queue.get_queue_name(input_queue_name)
output_queue_name = f"{input_queue_name}_output"
input_queue_name = Queue.get_input_queue_name(model_name)
output_queue_name = Queue.get_output_queue_name(model_name)
logger.info(f"Starting queue with: ('{input_queue_name}', '{output_queue_name}')")
return QueueWorker(input_queue_name, output_queue_name)

Expand All @@ -26,9 +25,10 @@ def __init__(self, input_queue_name: str, output_queue_name: str = None):
"""
super().__init__()
self.input_queue_name = input_queue_name
self.input_queues = self.restrict_queues_by_suffix(self.get_or_create_queues(input_queue_name), "_output")
q_suffix = f"_output" + Queue.get_queue_suffix()
self.input_queues = self.restrict_queues_by_suffix(self.get_or_create_queues(input_queue_name), q_suffix)
if output_queue_name:
self.output_queue_name = self.get_output_queue_name(input_queue_name, output_queue_name)
self.output_queue_name = Queue.get_output_queue_name()
self.output_queues = self.get_or_create_queues(output_queue_name)
self.all_queues = self.store_queue_map([item for row in [self.input_queues, self.output_queues] for item in row])
logger.info(f"Worker listening to queues of {self.all_queues}")
Expand Down Expand Up @@ -116,4 +116,4 @@ def delete_processed_messages(self, messages_with_queues: List[Tuple]):
Parameters:
- messages_with_queues (List[Tuple]): A list of tuples, each containing a message and its corresponding queue, to be deleted.
"""
self.delete_messages(messages_with_queues)
self.delete_messages(messages_with_queues)
Loading
Loading