Skip to content

Commit

Permalink
Add rate limiting for async delivery by using a goroutine with consum…
Browse files Browse the repository at this point in the history
…er channel
  • Loading branch information
DariaKunoichi committed Jul 31, 2024
1 parent 245bce0 commit 3125968
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 13 deletions.
14 changes: 11 additions & 3 deletions v2/bugsnag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/bugsnag/bugsnag-go/v2/sessions"
)

// ATTENTION - tests in this file are changing global state variables
// like default config or default report publisher
// TAKE CARE to reset them to default after testcase!

// The line numbers of this method are used in tests.
// If you move this function you'll have to change tests
func crashyHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -132,7 +136,7 @@ func TestNotify(t *testing.T) {
}

exception := getIndex(event, "exceptions", 0)
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "TestNotify", LineNumber: 98, InProject: true})
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "TestNotify", LineNumber: 102, InProject: true})
}

type testPublisher struct {
Expand All @@ -144,6 +148,9 @@ func (tp *testPublisher) publishReport(p *payload) error {
return nil
}

func (tp *testPublisher) setMainProgramContext(context.Context) {
}

func TestNotifySyncThenAsync(t *testing.T) {
ts, _ := setup()
defer ts.Close()
Expand All @@ -152,7 +159,7 @@ func TestNotifySyncThenAsync(t *testing.T) {

pub := new(testPublisher)
publisher = pub
defer func() { publisher = new(defaultReportPublisher) }()
defer func() { publisher = NewPublisher() }()

Notify(fmt.Errorf("oopsie"))
if pub.sync {
Expand All @@ -175,6 +182,7 @@ func TestHandlerFunc(t *testing.T) {
defer eventserver.Close()
Configure(generateSampleConfig(eventserver.URL))

// NOTE - this testcase will print a panic in verbose mode
t.Run("unhandled", func(st *testing.T) {
sessionTracker = nil
startSessionTracking()
Expand Down Expand Up @@ -315,7 +323,7 @@ func TestHandler(t *testing.T) {
}

exception := getIndex(event, "exceptions", 0)
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "crashyHandler", InProject: true, LineNumber: 24})
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "crashyHandler", InProject: true, LineNumber: 28})
}

func TestAutoNotify(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions v2/configuration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bugsnag

import (
"context"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -101,6 +102,12 @@ type Configuration struct {
// Whether bugsnag should notify synchronously. This defaults to false which
// causes bugsnag-go to spawn a new goroutine for each notification.
Synchronous bool

// Context created in the main program
// Used in event delivery - after this context is marked Done
// the event sending goroutine will switch to a graceful shutdown
// and will try to send any remaining events.
MainContext context.Context
// Whether the notifier should send all sessions recorded so far to Bugsnag
// when repanicking to ensure that no session information is lost in a
// fatal crash.
Expand Down
6 changes: 5 additions & 1 deletion v2/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/bugsnag/bugsnag-go/v2/errors"
)

var publisher reportPublisher = new(defaultReportPublisher)
var publisher reportPublisher = NewPublisher()

// Notifier sends errors to Bugsnag.
type Notifier struct {
Expand All @@ -24,6 +24,10 @@ func New(rawData ...interface{}) *Notifier {
}
}

if config.MainContext != nil {
publisher.setMainProgramContext(config.MainContext)
}

return &Notifier{
Config: config,
RawData: rawData,
Expand Down
97 changes: 88 additions & 9 deletions v2/report_publisher.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,106 @@
package bugsnag

import "fmt"
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
)

type reportPublisher interface {
publishReport(*payload) error
setMainProgramContext(context.Context)
}

type defaultReportPublisher struct{}
var mainProgramDone = false

func (*defaultReportPublisher) publishReport(p *payload) error {
func (defPub *defaultReportPublisher) delivery() {
signalsCh := make(chan os.Signal, 1)
signal.Notify(signalsCh, syscall.SIGINT, syscall.SIGTERM)
closeDelivery := false

waitForEnd:
for {
select {
case <-signalsCh:
mainProgramDone = true
break waitForEnd
case <-defPub.mainProgramCtx.Done():
mainProgramDone = true
break waitForEnd
case p, ok := <-defPub.eventsChan:
if ok {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
} else {
p.logf("Event channel closed")
return
}
}
}

// Send remaining elements from the queue
for !closeDelivery {
select {
case p, ok := <-defPub.eventsChan:
if ok {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
} else {
p.logf("Event channel closed")
return
}
default:
if mainProgramDone && len(defPub.eventsChan) == 0 {
fmt.Println("Main program done and event channel empty")
closeDelivery = true
}
}
}
}

type defaultReportPublisher struct{
eventsChan chan *payload
mainProgramCtx context.Context
}

func NewPublisher() *defaultReportPublisher {
defPub := defaultReportPublisher{}
defPub.eventsChan = make(chan *payload, 100)
defPub.mainProgramCtx = context.TODO()

go defPub.delivery()

return &defPub
}

func (defPub *defaultReportPublisher) setMainProgramContext(ctx context.Context) {
defPub.mainProgramCtx = ctx
}

func (defPub *defaultReportPublisher) publishReport(p *payload) error {
p.logf("notifying bugsnag: %s", p.Message)
if !p.notifyInReleaseStage() {
return fmt.Errorf("not notifying in %s", p.ReleaseStage)
}
if p.Synchronous {
return p.deliver()
}
} else {
if (mainProgramDone) {
return fmt.Errorf("main program is stopping, new events won't be sent")
}

go func(p *payload) {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
select {
case defPub.eventsChan <- p:
default:
p.logf("Events channel full. Discarding value")
}
}(p)
}

return nil
}

0 comments on commit 3125968

Please sign in to comment.