From 458619d9f051ed6be41ff342d66def84bc77b3c9 Mon Sep 17 00:00:00 2001 From: biningo Date: Wed, 4 Sep 2024 10:52:14 +0800 Subject: [PATCH] Fix memory leaks caused by time.After --- subscriber.go | 7 +++++-- syncer.go | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/subscriber.go b/subscriber.go index 8fc4eac9..88335fc3 100644 --- a/subscriber.go +++ b/subscriber.go @@ -8,9 +8,9 @@ import ( "sync" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" + "github.com/redis/go-redis/v9" ) type subscriber struct { @@ -58,12 +58,15 @@ func (s *subscriber) start(wg *sync.WaitGroup) { err error ) // Try until successfully connect to Redis. + timer := time.NewTimer(s.retryTimeout) + defer timer.Stop() for { pubsub, err = s.broker.CancelationPubSub() if err != nil { s.logger.Errorf("cannot subscribe to cancelation channel: %v", err) select { - case <-time.After(s.retryTimeout): + case <-timer.C: + timer.Reset(s.retryTimeout) continue case <-s.done: s.logger.Debug("Subscriber done") diff --git a/syncer.go b/syncer.go index f1be1933..c5572479 100644 --- a/syncer.go +++ b/syncer.go @@ -57,6 +57,8 @@ func (s *syncer) start(wg *sync.WaitGroup) { go func() { defer wg.Done() var requests []*syncRequest + timer := time.NewTimer(s.interval) + defer timer.Stop() for { select { case <-s.done: @@ -70,7 +72,7 @@ func (s *syncer) start(wg *sync.WaitGroup) { return case req := <-s.requestsCh: requests = append(requests, req) - case <-time.After(s.interval): + case <-timer.C: var temp []*syncRequest for _, req := range requests { if req.deadline.Before(time.Now()) { @@ -81,6 +83,7 @@ func (s *syncer) start(wg *sync.WaitGroup) { } } requests = temp + timer.Reset(s.interval) } } }()