Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

142 Stop Heartbeat StopAllConsuming() #162

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ use case where you actually need that sort of flexibility, please let us know.
Currently for each queue you are only supposed to call `StartConsuming()` and
`StopConsuming()` at most once.

Also note that `StopAllConsuming()` will stop the heartbeat for this connection.
It's advised to also not publish to any queue opened by this connection anymore.

### Return Rejected Deliveries

Even if you don't have a push queue setup there are cases where you need to
Expand Down
54 changes: 38 additions & 16 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

redisClient RedisClient
errChan chan<- error
heartbeatStop chan chan struct{}
heartbeatStop chan chan struct{} // used to stop heartbeat() in stopHeartbeat(), nil once stopped

lock sync.Mutex
stopped bool
Expand Down Expand Up @@ -112,7 +112,7 @@
rejectedTemplate: getTemplate(queueRejectedBaseTemplate, useRedisHashTags),
redisClient: redisClient,
errChan: errChan,
heartbeatStop: make(chan chan struct{}, 1),
heartbeatStop: make(chan chan struct{}, 1), // mark heartbeat as active, can be stopped
}

if err := connection.updateHeartbeat(); err != nil { // checks the connection
Expand Down Expand Up @@ -144,9 +144,9 @@
select {
case <-ticker.C:
// continue below
case c := <-connection.heartbeatStop:
close(c)
return
case c := <-connection.heartbeatStop: // stopHeartbeat() has been called
close(c) // confirm to stopHeartbeat() that the heartbeat is stopped
return // stop updating the heartbeat
}

err := connection.updateHeartbeat()
Expand All @@ -160,7 +160,13 @@

if errorCount >= HeartbeatErrorLimit {
// reached error limit

// To avoid using this connection while we're not able to maintain its heartbeat we stop all
// consumers. This in turn will call stopHeartbeat() and the responsibility of heartbeat() to
// confirm that the heartbeat is stopped, so we do that here too.

Check notice on line 166 in connection.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

4 statement(s) on lines 161:175 are not covered by tests.
connection.StopAllConsuming()
close(<-connection.heartbeatStop) // wait for stopHeartbeat() and confirm heartbeat is stopped

Check notice on line 169 in connection.go

View workflow job for this annotation

GitHub Actions / test (1.19.x)

4 statement(s) on lines 161:175 are not covered by tests.
// Clients reading from errChan need to see this error
// This allows them to shut themselves down
// Therefore we block adding it to errChan to ensure delivery
Expand Down Expand Up @@ -223,8 +229,15 @@

finishedChan := make(chan struct{})

// If we are already stopped or there are no open queues, then there is nothing to do
if connection.stopped || len(connection.openQueues) == 0 {
// If we are already stopped then there is nothing to do
if connection.stopped {
close(finishedChan)
return finishedChan
}

// If there are no open queues we still want to stop the heartbeat
if len(connection.openQueues) == 0 {
connection.stopHeartbeat()
close(finishedChan)
return finishedChan
}
Expand All @@ -239,8 +252,11 @@
for _, c := range chans {
<-c
}
close(finishedChan)
// log.Printf("rmq connection stopped consuming %s", queue)

// All consuming has been stopped. Now we can stop the heartbeat to avoid a goroutine leak.
connection.stopHeartbeat()

close(finishedChan) // signal all done
}()

return finishedChan
Expand Down Expand Up @@ -317,23 +333,29 @@
)
}

// stopHeartbeat stops the heartbeat of the connection
// it does not remove it from the list of connections so it can later be found by the cleaner
// stopHeartbeat stops the heartbeat of the connection.
// It does not remove it from the list of connections so it can later be found by the cleaner.
// Returns ErrorNotFound if the heartbeat was already stopped.
// Note that this function itself is not threadsafe, it's important to not call it multiple times
// at the same time. Currently it's only called in StopAllConsuming() where it's linearized by
// connection.lock.
func (connection *redisConnection) stopHeartbeat() error {
if connection.heartbeatStop == nil {
if connection.heartbeatStop == nil { // already stopped
return ErrorNotFound
}

heartbeatStopped := make(chan struct{})
connection.heartbeatStop <- heartbeatStopped
<-heartbeatStopped
connection.heartbeatStop = nil // avoid stopping twice
<-heartbeatStopped // wait for heartbeat() to confirm it's stopped
connection.heartbeatStop = nil // mark heartbeat as stopped

// Delete heartbeat key to immediately make the connection appear inactive to the cleaner,
// instead of waiting for the heartbeat key to run into its TTL.
count, err := connection.redisClient.Del(connection.heartbeatKey)
if err != nil {
if err != nil { // redis error
return err
}
if count == 0 {
if count == 0 { // heartbeat key didn't exist
return ErrorNotFound
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

var (
ErrorNotFound = errors.New("entity not found") // entitify being connection/queue/delivery
ErrorNotFound = errors.New("entity not found") // entity being connection/queue/delivery/heartbeat
ErrorAlreadyConsuming = errors.New("must not call StartConsuming() multiple times")
ErrorNotConsuming = errors.New("must call StartConsuming() before adding consumers")
ErrorConsumingStopped = errors.New("consuming stopped")
Expand Down
32 changes: 32 additions & 0 deletions queue_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,29 @@ func TestClusterStopConsuming_BatchConsumer(t *testing.T) {
assert.NoError(t, connection.stopHeartbeat())
}

func TestClusterConnection_StopAllConsuming_CalledTwice(t *testing.T) {
redisOptions, closer := testClusterRedis(t)
defer closer()

connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil)
assert.NoError(t, err)

finishedChan := connection.StopAllConsuming()
require.NotNil(t, finishedChan)
<-finishedChan // wait for stopping to finish

// check that heartbeat has been stopped
assert.Equal(t, connection.checkHeartbeat(), ErrorNotFound)

// it's safe to call StopAllConsuming again
finishedChan = connection.StopAllConsuming()
require.NotNil(t, finishedChan)
<-finishedChan // wait for stopping to finish

// heartbeat is still stopped of course
assert.Equal(t, connection.checkHeartbeat(), ErrorNotFound)
}

func TestClusterConnection_StopAllConsuming_CantOpenQueue(t *testing.T) {
redisOptions, closer := testClusterRedis(t)
defer closer()
Expand All @@ -657,6 +680,9 @@ func TestClusterConnection_StopAllConsuming_CantOpenQueue(t *testing.T) {
require.NotNil(t, finishedChan)
<-finishedChan // wait for stopping to finish

// check that heartbeat has been stopped
assert.Equal(t, connection.checkHeartbeat(), ErrorNotFound)

queue, err := connection.OpenQueue("consume-q")
require.Nil(t, queue)
require.Equal(t, ErrorConsumingStopped, err)
Expand All @@ -677,6 +703,9 @@ func TestClusterConnection_StopAllConsuming_CantStartConsuming(t *testing.T) {
require.NotNil(t, finishedChan)
<-finishedChan // wait for stopping to finish

// check that heartbeat has been stopped
assert.Equal(t, connection.checkHeartbeat(), ErrorNotFound)

err = queue.StartConsuming(20, time.Millisecond)
require.Equal(t, ErrorConsumingStopped, err)
}
Expand Down Expand Up @@ -717,6 +746,9 @@ func TestClusterConnection_StopAllConsuming_CantAddConsumer(t *testing.T) {
require.NotNil(t, finishedChan)
<-finishedChan // wait for stopping to finish

// check that heartbeat has been stopped
assert.Equal(t, connection.checkHeartbeat(), ErrorNotFound)

_, err = queue.AddConsumer("late-consume", NewTestConsumer("late-consumer"))
require.Equal(t, ErrorConsumingStopped, err)
}
Expand Down
Loading