Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible deadlock in AWS pubsub #804

Merged
merged 1 commit into from
Jul 19, 2023
Merged

Conversation

DomBlack
Copy link
Contributor

No description provided.

@encore-cla
Copy link

encore-cla bot commented Jul 19, 2023

All committers have signed the CLA.

Comment on lines +178 to +180
var lastFetch atomic.Pointer[time.Time]
var epoch time.Time
lastFetch.Store(&epoch)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a race on this, so I've moved it into an atomic

Comment on lines +213 to +214
case <-fetchCtx.Done():
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was the deadlock;

If all the workers panic'ed and quit, that could have resulted in the fetch processor trying to write onto the workChan but nothing pulling and reading them.

I've solved this by adding this new select, so if the ctx is done, we don't even try to push to the channel & I've also added additional panic recovery wrappers at other points in the code.

Comment on lines +103 to +109
// We should only long poll for 20 seconds, so if this takes more than
// 30 seconds we should cancel the context and try again
//
// We do this incase the ReceiveMessage call gets stuck on the server
// and doesn't return
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One theory I have is the AWS library might be stalling and blocking under high load, so I've introduced a smaller timeout to try and cause a context cancelled error


// Check if the context has been cancelled, and if so, return the error
if ctx.Err() != nil {
return ctx.Err()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This might hide other errors from the func if they error for some other reason

// If there was an error processing the message, apply the backoff policy
_, delay := utils.GetDelay(retryPolicy.MaxRetries, retryPolicy.MinBackoff, retryPolicy.MaxBackoff, uint16(deliveryAttempt))
_, visibilityChangeErr := t.sqsClient.ChangeMessageVisibility(t.ctx, &sqs.ChangeMessageVisibilityInput{
_, visibilityChangeErr := t.sqsClient.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do this with a context not derived from the input, so we do this even if the input context is canceled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I delribatley wanted both these API calls to release when the fetch context is cancelled, as we immediately go into a loop to go again.

I think them being based on the t.ctx, rather than the fetch context could have been an issue; as we never cancel the t.ctx, but the fetchCtx is cancelled when we want to exit the WorkConcurrently code

_, err = t.sqsClient.DeleteMessage(t.ctx, &sqs.DeleteMessageInput{
} else {
// If the message was processed successfully, delete it from the queue
_, err = t.sqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

fetchWithPanicHandling := func(ctx context.Context, maxToFetch int) (work []Work, err error) {
defer func() {
if r := recover(); r != nil {
err = errs.B().Msgf("panic: %v", r).Err()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include the stack like we do elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errs.B() will build in a stack no?

runtime/pubsub/internal/utils/workers.go Show resolved Hide resolved
@DomBlack DomBlack merged commit 82235d7 into main Jul 19, 2023
2 of 3 checks passed
@DomBlack DomBlack deleted the possible-deadlock-removal branch July 19, 2023 18:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants