Skip to content

Commit

Permalink
Merge pull request #233 from dvonthenen/fix-open-close-event-pair
Browse files Browse the repository at this point in the history
Fix Open/Close Event Pairing After Fatal Reconnect Refactor
  • Loading branch information
dvonthenen authored May 31, 2024
2 parents b35c0bf + 868ad2c commit 8fdd098
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 57 deletions.
148 changes: 92 additions & 56 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,70 +281,89 @@ func (c *Client) listen() {
select {
case <-c.ctx.Done():
c.closeWs(false)
klog.V(6).Infof("live.listen() Done\n")
klog.V(6).Infof("live.listen() Signal Exit\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
case <-ticker.C:
for {
ws := c.Connect()
if ws == nil {
klog.V(3).Infof("listen: Connection is not valid\n")
ws := c.Connect()
if ws == nil {
klog.V(3).Infof("listen: Connection is not valid\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
}

msgType, byMsg, err := ws.ReadMessage()
if err != nil {
errStr := err.Error()
switch {
case strings.Contains(errStr, SuccessfulSocketErr):
klog.V(3).Infof("Graceful websocket close\n")

// graceful close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
}
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)

msgType, byMsg, err := ws.ReadMessage()
if err != nil {
errStr := err.Error()
switch {
case strings.Contains(errStr, SuccessfulSocketErr):
klog.V(3).Infof("Graceful websocket close\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
c.closeWs(true)
return
case strings.Contains(errStr, "Deepgram"):
klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err)

// extract DG error
response := c.ErrorToResponse(err)
if c.callback != nil {
err := c.router.ErrorHelper(response)
if err != nil {
klog.V(1).Infof("listen: router.Error failed. Err: %v\n", err)
}
} else {
klog.V(7).Infof("listen: Deepgram Error: %v\n", err)
}

// reset connection
c.closeWs(true)
return
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)
// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr)
}
continue
}

if len(byMsg) == 0 {
klog.V(7).Infof("listen: message empty")
continue
}
// fatal close
c.closeWs(true)

if c.callback != nil {
err := c.router.Message(byMsg)
if err != nil {
klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err)
klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, "Deepgram"):
klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Deepgram ErrorMsg. Err: %v\n", sendErr)
}
} else {
klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg))

// close the connection
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr)
}

// close the connection
c.closeWs(true)

klog.V(6).Infof("live.listen() LEAVE\n")
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)
}
continue
}

if len(byMsg) == 0 {
klog.V(7).Infof("listen: message empty")
continue
}

if c.callback != nil {
err := c.router.Message(byMsg)
if err != nil {
klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err)
}
} else {
klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg))
}
}
}
Expand Down Expand Up @@ -387,6 +406,7 @@ func (c *Client) Stream(r io.Reader) error {
}

if bytesRead == 0 {
klog.V(7).Infof("Skipping. bytesRead == 0\n")
continue
}

Expand Down Expand Up @@ -590,6 +610,10 @@ func (c *Client) ping() {
select {
case <-c.ctx.Done():
klog.V(3).Infof("live.ping() Exiting\n")

// exit gracefully
c.closeWs(false)

klog.V(6).Infof("live.ping() LEAVE\n")
return
case <-ticker.C:
Expand Down Expand Up @@ -621,7 +645,19 @@ func (c *Client) ping() {
}
}

func (c *Client) ErrorToResponse(err error) *msginterfaces.ErrorResponse {
// sendError sends an error message to the callback handler
func (c *Client) sendError(err error) error {
response := c.errorToResponse(err)
sendErr := c.router.ErrorHelper(response)
if err != nil {
klog.V(1).Infof("listen: router.Error failed. Err: %v\n", sendErr)
}

return err
}

// errorToResponse converts an error into a Deepgram error response
func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse {
r := regexp.MustCompile(`websocket: ([a-z]+) (\d+) .+: (.+)`)

var errorCode string
Expand All @@ -641,8 +677,8 @@ func (c *Client) ErrorToResponse(err error) *msginterfaces.ErrorResponse {

response := &msginterfaces.ErrorResponse{
Type: msginterfaces.TypeErrorResponse,
Message: fmt.Sprintf("%s %s", errorCode, errorNum),
Description: errorDesc,
Message: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)),
Description: strings.TrimSpace(errorDesc),
Variant: errorNum,
}
return response
Expand Down
4 changes: 3 additions & 1 deletion tests/edge_cases/cancel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (

func main() {
// init library
client.InitWithDefault()
client.Init(client.InitLib{
LogLevel: client.LogLevelTrace, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace
})

// Go context
ctx := context.Background()
Expand Down

0 comments on commit 8fdd098

Please sign in to comment.