Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV2-3551 add local queue consumption and re-work a ton of the startup flow to accommodate #33

Merged
merged 6 commits into from
Aug 30, 2023

Conversation

DGaffney
Copy link
Collaborator

@DGaffney DGaffney commented Aug 29, 2023

Alright, so a few things happening here as per conversations last week:

  1. Break queue class into a parent queue class and two subclasses, worker and processor,
  2. Worker is responsible for its own concept of an input/output queue, and nobody else needs to think about that idea,
  3. Processor sits on any output queues matching a given input queue string, and fires callbacks,
  4. and rework the makefile / starter shell file to run both workers simultaneously on start.

This should allow us to run everything end-to-end locally at this point. We should be able to just set MODEL_NAME= mean_tokens__Model and then start up the environment, and be able to operate full round trips

if messages_with_queues:
logger.debug(f"About to respond to: ({messages_with_queues})")
bodies = [schemas.Message(**json.loads(message.body)) for message, queue in messages_with_queues]
for body in bodies:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in this PR, but we should probably call this in parallel

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosign

lib/queue/worker.py Outdated Show resolved Hide resolved

logger.info("Beginning callback loop...")
while True:
queue.send_callbacks()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want some (tuneable?) time interval in here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when the worker dies?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all happens in the context of sqs messages, so the message eventually gets tossed back into available messages and some other worker will pick it up - should be fairly foolproof as it just ties into existing foolproof frameworks...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there will be dataloss because queues, but if the worker dies, we will stop pulling things off the queue while other parts of the system are still happily putting things in queues ... but will AWS know that the service needs to be restarted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our discussions so far, my understanding is that one of the big reasons to use SQS directly would be to make our system scale according to queue depth. As the queue grows, we would increase the number of workers to address. A hung machine would still potentially need to be taken out back individually, but it shouldn't arrest all work entirely.

@DGaffney DGaffney merged commit ebb029a into master Aug 30, 2023
2 checks passed
@DGaffney DGaffney deleted the cv2-3551-local-queue-consumption branch August 30, 2023 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants