Skip to content

Commit

Permalink
feature/graceful transport worker termination
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikhail Zvonov committed Oct 18, 2024
1 parent 6bd150f commit dfebbda
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 24 deletions.
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ type ClientOptions struct {
MaxErrorDepth int
// Default event tags. These are overridden by tags set on a scope.
Tags map[string]string

// Optional chan receiving from caller signal about termination.
// Useful to prevent goroutines leak in case of multiple Transport instances initiated.
Done <-chan struct{}
}

// Client is the underlying processor that is used by the main API and Hub
Expand Down
64 changes: 40 additions & 24 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ type HTTPTransport struct {

mu sync.RWMutex
limits ratelimit.Map

// receiving struct means caller terminates.
done <-chan struct{}
}

// NewHTTPTransport returns a new pre-configured instance of HTTPTransport.
Expand Down Expand Up @@ -398,6 +401,10 @@ func (t *HTTPTransport) Configure(options ClientOptions) {
}
}

if options.Done != nil {
t.done = options.Done
}

t.start.Do(func() {
go t.worker()
})
Expand Down Expand Up @@ -532,35 +539,44 @@ func (t *HTTPTransport) worker() {
t.buffer <- b

// Process all batch items.
for item := range b.items {
if t.disabled(item.category) {
continue
}
loop:
for {
select {
case <-t.done:
return
case item, open := <-b.items:
if !open {
break loop
}
if t.disabled(item.category) {
continue
}

response, err := t.client.Do(item.request)
if err != nil {
Logger.Printf("There was an issue with sending an event: %v", err)
continue
}
if response.StatusCode >= 400 && response.StatusCode <= 599 {
b, err := io.ReadAll(response.Body)
response, err := t.client.Do(item.request)
if err != nil {
Logger.Printf("Error while reading response code: %v", err)
Logger.Printf("There was an issue with sending an event: %v", err)
continue
}
if response.StatusCode >= 400 && response.StatusCode <= 599 {
b, err := io.ReadAll(response.Body)
if err != nil {
Logger.Printf("Error while reading response code: %v", err)
}
Logger.Printf("Sending %s failed with the following error: %s", eventType, string(b))
}
Logger.Printf("Sending %s failed with the following error: %s", eventType, string(b))
}

t.mu.Lock()
if t.limits == nil {
t.limits = make(ratelimit.Map)
}
t.limits.Merge(ratelimit.FromResponse(response))
t.mu.Unlock()
t.mu.Lock()
if t.limits == nil {
t.limits = make(ratelimit.Map)
}
t.limits.Merge(ratelimit.FromResponse(response))
t.mu.Unlock()

// Drain body up to a limit and close it, allowing the
// transport to reuse TCP connections.
_, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes)
response.Body.Close()
// Drain body up to a limit and close it, allowing the
// transport to reuse TCP connections.
_, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes)
response.Body.Close()
}
}

// Signal that processing of the batch is done.
Expand Down

0 comments on commit dfebbda

Please sign in to comment.