Skip to content

Commit

Permalink
Add rate limiting for async delivery by using a goroutine with consum…
Browse files Browse the repository at this point in the history
…er channel
  • Loading branch information
DariaKunoichi committed Jul 30, 2024
1 parent 245bce0 commit bb817e2
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 10 deletions.
3 changes: 3 additions & 0 deletions v2/bugsnag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (tp *testPublisher) publishReport(p *payload) error {
return nil
}

func (tp *testPublisher) setMainProgramContext(context.Context) {
}

func TestNotifySyncThenAsync(t *testing.T) {
ts, _ := setup()
defer ts.Close()
Expand Down
7 changes: 7 additions & 0 deletions v2/configuration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bugsnag

import (
"context"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -101,6 +102,12 @@ type Configuration struct {
// Whether bugsnag should notify synchronously. This defaults to false which
// causes bugsnag-go to spawn a new goroutine for each notification.
Synchronous bool

// Context created in the main program
// Used in event delivery - after this context is marked Done
// the event sending goroutine will switch to a graceful shutdown
// and will try to send any remaining events.
MainContext context.Context
// Whether the notifier should send all sessions recorded so far to Bugsnag
// when repanicking to ensure that no session information is lost in a
// fatal crash.
Expand Down
6 changes: 5 additions & 1 deletion v2/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/bugsnag/bugsnag-go/v2/errors"
)

var publisher reportPublisher = new(defaultReportPublisher)
var publisher reportPublisher = NewPublisher()

// Notifier sends errors to Bugsnag.
type Notifier struct {
Expand All @@ -24,6 +24,10 @@ func New(rawData ...interface{}) *Notifier {
}
}

if config.MainContext != nil {
publisher.setMainProgramContext(config.MainContext)
}

return &Notifier{
Config: config,
RawData: rawData,
Expand Down
97 changes: 88 additions & 9 deletions v2/report_publisher.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,106 @@
package bugsnag

import "fmt"
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
)

type reportPublisher interface {
publishReport(*payload) error
setMainProgramContext(context.Context)
}

type defaultReportPublisher struct{}
var mainProgramDone = false

func (*defaultReportPublisher) publishReport(p *payload) error {
func (defPub *defaultReportPublisher) delivery() {
signalsCh := make(chan os.Signal, 1)
signal.Notify(signalsCh, syscall.SIGINT, syscall.SIGTERM)
closeDelivery := false

waitForEnd:
for {
select {
case <-signalsCh:
mainProgramDone = true
break waitForEnd
case <-defPub.mainProgramCtx.Done():
mainProgramDone = true
break waitForEnd
case p, ok := <-defPub.eventsChan:
if ok {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
} else {
p.logf("Event channel closed")
return
}
}
}

// Send remaining elements from the queue
for !closeDelivery {
select {
case p, ok := <-defPub.eventsChan:
if ok {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
} else {
p.logf("Event channel closed")
return
}
default:
if mainProgramDone && len(defPub.eventsChan) == 0 {
fmt.Println("Main program done and event channel empty")
closeDelivery = true
}
}
}
}

type defaultReportPublisher struct{
eventsChan chan *payload
mainProgramCtx context.Context
}

func NewPublisher() *defaultReportPublisher {
defPub := defaultReportPublisher{}
defPub.eventsChan = make(chan *payload, 100)
defPub.mainProgramCtx = context.TODO()

go defPub.delivery()

return &defPub
}

func (defPub *defaultReportPublisher) setMainProgramContext(ctx context.Context) {
defPub.mainProgramCtx = ctx
}

func (defPub *defaultReportPublisher) publishReport(p *payload) error {
p.logf("notifying bugsnag: %s", p.Message)
if !p.notifyInReleaseStage() {
return fmt.Errorf("not notifying in %s", p.ReleaseStage)
}
if p.Synchronous {
return p.deliver()
}
} else {
if (mainProgramDone) {
return fmt.Errorf("main program is stopping, new events won't be sent")
}

go func(p *payload) {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
select {
case defPub.eventsChan <- p:
default:
p.logf("Events channel full. Discarding value")
}
}(p)
}

return nil
}

0 comments on commit bb817e2

Please sign in to comment.