Skip to content

Commit

Permalink
Add rate limiting for async delivery by using a goroutine with consum…
Browse files Browse the repository at this point in the history
…er channel
  • Loading branch information
DariaKunoichi committed Jul 31, 2024
1 parent 245bce0 commit 87f5d16
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 28 deletions.
15 changes: 9 additions & 6 deletions features/fixtures/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
bugsnag "github.com/bugsnag/bugsnag-go/v2"
)

func configureBasicBugsnag(testcase string) {
func configureBasicBugsnag(testcase string, ctx context.Context) {
config := bugsnag.Configuration{
APIKey: os.Getenv("API_KEY"),
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
APIKey: os.Getenv("API_KEY"),
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
MainContext: ctx,
}

if notifyReleaseStages := os.Getenv("NOTIFY_RELEASE_STAGES"); notifyReleaseStages != "" {
Expand Down Expand Up @@ -63,11 +64,13 @@ func configureBasicBugsnag(testcase string) {
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

test := flag.String("test", "handled", "what the app should send, either handled, unhandled, session, autonotify")
flag.Parse()

configureBasicBugsnag(*test)
configureBasicBugsnag(*test, ctx)
time.Sleep(100 * time.Millisecond) // Ensure tests are less flaky by ensuring the start-up session gets sent

switch *test {
Expand Down
13 changes: 8 additions & 5 deletions features/fixtures/net_http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
)

func main() {
configureBasicBugsnag()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
configureBasicBugsnag(ctx)

http.HandleFunc("/handled", handledError)
http.HandleFunc("/autonotify-then-recover", unhandledCrash)
Expand All @@ -40,16 +42,17 @@ func recoverWrap(h http.Handler) http.Handler {
})
}

func configureBasicBugsnag() {
func configureBasicBugsnag(ctx context.Context) {
config := bugsnag.Configuration{
APIKey: os.Getenv("API_KEY"),
Endpoints: bugsnag.Endpoints{
Notify: os.Getenv("BUGSNAG_ENDPOINT"),
Sessions: os.Getenv("BUGSNAG_ENDPOINT"),
},
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
AppVersion: os.Getenv("APP_VERSION"),
AppType: os.Getenv("APP_TYPE"),
Hostname: os.Getenv("HOSTNAME"),
MainContext: ctx,
}

if notifyReleaseStages := os.Getenv("NOTIFY_RELEASE_STAGES"); notifyReleaseStages != "" {
Expand Down
14 changes: 11 additions & 3 deletions v2/bugsnag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/bugsnag/bugsnag-go/v2/sessions"
)

// ATTENTION - tests in this file are changing global state variables
// like default config or default report publisher
// TAKE CARE to reset them to default after testcase!

// The line numbers of this method are used in tests.
// If you move this function you'll have to change tests
func crashyHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -132,7 +136,7 @@ func TestNotify(t *testing.T) {
}

exception := getIndex(event, "exceptions", 0)
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "TestNotify", LineNumber: 98, InProject: true})
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "TestNotify", LineNumber: 102, InProject: true})
}

type testPublisher struct {
Expand All @@ -144,6 +148,9 @@ func (tp *testPublisher) publishReport(p *payload) error {
return nil
}

func (tp *testPublisher) setMainProgramContext(context.Context) {
}

func TestNotifySyncThenAsync(t *testing.T) {
ts, _ := setup()
defer ts.Close()
Expand All @@ -152,7 +159,7 @@ func TestNotifySyncThenAsync(t *testing.T) {

pub := new(testPublisher)
publisher = pub
defer func() { publisher = new(defaultReportPublisher) }()
defer func() { publisher = newPublisher() }()

Notify(fmt.Errorf("oopsie"))
if pub.sync {
Expand All @@ -175,6 +182,7 @@ func TestHandlerFunc(t *testing.T) {
defer eventserver.Close()
Configure(generateSampleConfig(eventserver.URL))

// NOTE - this testcase will print a panic in verbose mode
t.Run("unhandled", func(st *testing.T) {
sessionTracker = nil
startSessionTracking()
Expand Down Expand Up @@ -315,7 +323,7 @@ func TestHandler(t *testing.T) {
}

exception := getIndex(event, "exceptions", 0)
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "crashyHandler", InProject: true, LineNumber: 24})
verifyExistsInStackTrace(t, exception, &StackFrame{File: "bugsnag_test.go", Method: "crashyHandler", InProject: true, LineNumber: 28})
}

func TestAutoNotify(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions v2/configuration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bugsnag

import (
"context"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -101,6 +102,12 @@ type Configuration struct {
// Whether bugsnag should notify synchronously. This defaults to false which
// causes bugsnag-go to spawn a new goroutine for each notification.
Synchronous bool

// Context created in the main program
// Used in event delivery - after this context is marked Done
// the event sending goroutine will switch to a graceful shutdown
// and will try to send any remaining events.
MainContext context.Context
// Whether the notifier should send all sessions recorded so far to Bugsnag
// when repanicking to ensure that no session information is lost in a
// fatal crash.
Expand Down Expand Up @@ -160,6 +167,10 @@ func (config *Configuration) update(other *Configuration) *Configuration {
if other.Synchronous {
config.Synchronous = true
}
if other.MainContext != nil {
config.MainContext = other.MainContext
publisher.setMainProgramContext(other.MainContext)
}

if other.AutoCaptureSessions != nil {
config.AutoCaptureSessions = other.AutoCaptureSessions
Expand Down
11 changes: 6 additions & 5 deletions v2/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/bugsnag/bugsnag-go/v2/errors"
)

var publisher reportPublisher = new(defaultReportPublisher)
var publisher reportPublisher = newPublisher()

// Notifier sends errors to Bugsnag.
type Notifier struct {
Expand Down Expand Up @@ -84,10 +84,11 @@ func (notifier *Notifier) NotifySync(err error, sync bool, rawData ...interface{
// AutoNotify notifies Bugsnag of any panics, then repanics.
// It sends along any rawData that gets passed in.
// Usage:
// go func() {
// defer AutoNotify()
// // (possibly crashy code)
// }()
//
// go func() {
// defer AutoNotify()
// // (possibly crashy code)
// }()
func (notifier *Notifier) AutoNotify(rawData ...interface{}) {
if err := recover(); err != nil {
severity := notifier.getDefaultSeverity(rawData, SeverityError)
Expand Down
95 changes: 86 additions & 9 deletions v2/report_publisher.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,87 @@
package bugsnag

import "fmt"
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
)

type reportPublisher interface {
publishReport(*payload) error
setMainProgramContext(context.Context)
}

type defaultReportPublisher struct{}
func (defPub *defaultReportPublisher) delivery() {
signalsCh := make(chan os.Signal, 1)
signal.Notify(signalsCh, syscall.SIGINT, syscall.SIGTERM)
closeDelivery := false

func (*defaultReportPublisher) publishReport(p *payload) error {
waitForEnd:
for {
select {
case <-signalsCh:
defPub.isClosing = true
break waitForEnd
case <-defPub.mainProgramCtx.Done():
defPub.isClosing = true
break waitForEnd
case p, ok := <-defPub.eventsChan:
if ok {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
} else {
p.logf("Event channel closed")
return
}
}
}

// Send remaining elements from the queue
for !closeDelivery {
select {
case p, ok := <-defPub.eventsChan:
if ok {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
} else {
p.logf("Event channel closed")
return
}
default:
if defPub.isClosing && len(defPub.eventsChan) == 0 {
fmt.Println("Main program done and event channel empty")
closeDelivery = true
}
}
}
}

type defaultReportPublisher struct {
eventsChan chan *payload
mainProgramCtx context.Context
isClosing bool
}

func newPublisher() reportPublisher {
defPub := defaultReportPublisher{isClosing: false, mainProgramCtx: context.TODO()}
defPub.eventsChan = make(chan *payload, 100)

go defPub.delivery()

return &defPub
}

func (defPub *defaultReportPublisher) setMainProgramContext(ctx context.Context) {
defPub.mainProgramCtx = ctx
}

func (defPub *defaultReportPublisher) publishReport(p *payload) error {
p.logf("notifying bugsnag: %s", p.Message)
if !p.notifyInReleaseStage() {
return fmt.Errorf("not notifying in %s", p.ReleaseStage)
Expand All @@ -17,11 +90,15 @@ func (*defaultReportPublisher) publishReport(p *payload) error {
return p.deliver()
}

go func(p *payload) {
if err := p.deliver(); err != nil {
// Ensure that any errors are logged if they occur in a goroutine.
p.logf("bugsnag/defaultReportPublisher.publishReport: %v", err)
}
}(p)
if defPub.isClosing {
return fmt.Errorf("main program is stopping, new events won't be sent")
}

select {
case defPub.eventsChan <- p:
default:
p.logf("Events channel full. Discarding value")
}

return nil
}

0 comments on commit 87f5d16

Please sign in to comment.