Skip to content

Commit

Permalink
[#98]: chore: check the connection state if the queue is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Sep 13, 2023
2 parents 4adcdec + 7e16455 commit 5ed589f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 51 deletions.
65 changes: 31 additions & 34 deletions amqpjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,24 @@ func (d *Driver) State(ctx context.Context) (*jobs.State, error) {
d.stateChan <- stateCh
}()

pipe := *d.pipeline.Load()

// if there is no queue, check the connection instead
if d.queue == "" {
// d.conn should be protected (redial)
d.mu.Lock()
defer d.mu.Unlock()

if !d.conn.IsClosed() {
return &jobs.State{
Priority: uint64(pipe.Priority()),
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Delayed: atomic.LoadInt64(d.delayed),
Ready: ready(atomic.LoadUint32(&d.listeners)),
}, nil
}

return nil, errors.Str("empty queue name, add queue to the AMQP configuration")
}

Expand All @@ -454,8 +471,6 @@ func (d *Driver) State(ctx context.Context) (*jobs.State, error) {
return nil, errors.E(op, err)
}

pipe := *d.pipeline.Load()

return &jobs.State{
Priority: uint64(pipe.Priority()),
Pipeline: pipe.Name(),
Expand Down Expand Up @@ -498,17 +513,22 @@ func (d *Driver) Pause(ctx context.Context, p string) error {
d.mu.Lock()
defer d.mu.Unlock()

err := d.consumeChan.Cancel(d.consumeID, true)
err := d.consumeChan.Flow(false)
if err != nil {
d.log.Error("cancel publish channel, forcing close", zap.Error(err))
d.log.Error("flow (pause)", zap.Error(err))
errCl := d.consumeChan.Close()
if errCl != nil {
return stderr.Join(errCl, err)
}
return err
}

d.log.Debug("pipeline was paused", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
d.log.Debug("pipeline was paused",
zap.String("driver", pipe.Driver()),
zap.String("pipeline", pipe.Name()),
zap.Time("start", start),
zap.Duration("elapsed", time.Since(start)),
)

return nil
}
Expand Down Expand Up @@ -538,39 +558,16 @@ func (d *Driver) Resume(ctx context.Context, p string) error {
return errors.Str("amqp listener is already in the active state")
}

var err error
err = d.declareQueue()
if err != nil {
return err
}

d.consumeChan, err = d.conn.Channel()
if err != nil {
return err
}

err = d.consumeChan.Qos(d.prefetch, 0, false)
if err != nil {
return err
}

// start reading messages from the channel
deliv, err := d.consumeChan.Consume(
d.queue,
d.consumeID,
false,
false,
false,
false,
nil,
)
err := d.consumeChan.Flow(true)
if err != nil {
d.log.Error("flow (pause)", zap.Error(err))
errCl := d.consumeChan.Close()
if errCl != nil {
return stderr.Join(errCl, err)
}
return err
}

// run listener
d.listener(deliv)

// increase number of listeners
atomic.AddUint32(&d.listeners, 1)
d.log.Debug("pipeline was resumed", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ require (
github.com/roadrunner-server/endure/v2 v2.4.2
github.com/roadrunner-server/errors v1.3.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/propagators/jaeger v1.18.0
go.opentelemetry.io/otel v1.17.0
go.opentelemetry.io/otel/sdk v1.17.0
go.opentelemetry.io/otel/trace v1.17.0
go.opentelemetry.io/contrib/propagators/jaeger v1.19.0
go.opentelemetry.io/otel v1.18.0
go.opentelemetry.io/otel/sdk v1.18.0
go.opentelemetry.io/otel/trace v1.18.0
go.uber.org/zap v1.25.0
golang.org/x/sys v0.12.0
)
Expand All @@ -26,7 +26,7 @@ require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.17.0 // indirect
go.opentelemetry.io/otel/metric v1.18.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
22 changes: 10 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/roadrunner-server/api/v4 v4.7.1 h1:BYU2n92mcVyanV4w3fBSSztfP8ILUh5IyndRI1UD2/8=
github.com/roadrunner-server/api/v4 v4.7.1/go.mod h1:Ut9T2j3E22cnRJtipbU8N3WVhyV040iiDfddzojlKwY=
github.com/roadrunner-server/api/v4 v4.8.0 h1:wsANwein0dD3q9OQi8L6XGjgEkEn/23mAJ4ZphavfLM=
github.com/roadrunner-server/api/v4 v4.8.0/go.mod h1:FhCdSyHWBJfyZ0yuggVE72WYKcmAOsGuJbC3dvzaMOg=
github.com/roadrunner-server/endure/v2 v2.4.2 h1:aFnPc321l5HDzE2mN5wwfksJ40lgXwfU3RSqdS1LyUQ=
Expand All @@ -40,16 +38,16 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/contrib/propagators/jaeger v1.18.0 h1:T457dcPEUr4+wimXmIs+2lI8vpSnRpxEhSsY2n7+UjU=
go.opentelemetry.io/contrib/propagators/jaeger v1.18.0/go.mod h1:FTAfGYSYWANl3fOqHpZYeC7AAAv4sdYgJ724NnE1msY=
go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM=
go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0=
go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc=
go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o=
go.opentelemetry.io/otel/sdk v1.17.0 h1:FLN2X66Ke/k5Sg3V623Q7h7nt3cHXaW1FOvKKrW0IpE=
go.opentelemetry.io/otel/sdk v1.17.0/go.mod h1:U87sE0f5vQB7hwUoW98pW5Rz4ZDuCFBZFNUBlSgmDFQ=
go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ=
go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY=
go.opentelemetry.io/contrib/propagators/jaeger v1.19.0 h1:mGrx7XEAE+7ybCLM0T6iRl/jUTuHg6qKUJAtsAlknec=
go.opentelemetry.io/contrib/propagators/jaeger v1.19.0/go.mod h1:cHWVPhYWMZOanEf1qexqMIRhr4TKVjZWBKwZTL/tdR4=
go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs=
go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI=
go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ=
go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k=
go.opentelemetry.io/otel/sdk v1.18.0 h1:e3bAB0wB3MljH38sHzpV/qWrOTCFrdZF2ct9F8rBkcY=
go.opentelemetry.io/otel/sdk v1.18.0/go.mod h1:1RCygWV7plY2KmdskZEDDBs4tJeHG92MdHZIluiYs/M=
go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10=
go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down

0 comments on commit 5ed589f

Please sign in to comment.