diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index 6b2c76f1..51530116 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -281,70 +281,89 @@ func (c *Client) listen() { select { case <-c.ctx.Done(): c.closeWs(false) - klog.V(6).Infof("live.listen() Done\n") + klog.V(6).Infof("live.listen() Signal Exit\n") klog.V(6).Infof("live.listen() LEAVE\n") return case <-ticker.C: - for { - ws := c.Connect() - if ws == nil { - klog.V(3).Infof("listen: Connection is not valid\n") + ws := c.Connect() + if ws == nil { + klog.V(3).Infof("listen: Connection is not valid\n") + klog.V(6).Infof("live.listen() LEAVE\n") + return + } + + msgType, byMsg, err := ws.ReadMessage() + if err != nil { + errStr := err.Error() + switch { + case strings.Contains(errStr, SuccessfulSocketErr): + klog.V(3).Infof("Graceful websocket close\n") + + // graceful close + c.closeWs(false) + klog.V(6).Infof("live.listen() LEAVE\n") return - } + case strings.Contains(errStr, FatalReadSocketErr): + klog.V(1).Infof("Fatal socket error: %v\n", err) - msgType, byMsg, err := ws.ReadMessage() - if err != nil { - errStr := err.Error() - switch { - case strings.Contains(errStr, SuccessfulSocketErr): - klog.V(3).Infof("Graceful websocket close\n") - klog.V(6).Infof("live.listen() LEAVE\n") - return - case strings.Contains(errStr, FatalReadSocketErr): - klog.V(1).Infof("Fatal socket error: %v\n", err) - c.closeWs(true) - return - case strings.Contains(errStr, "Deepgram"): - klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err) - - // extract DG error - response := c.ErrorToResponse(err) - if c.callback != nil { - err := c.router.ErrorHelper(response) - if err != nil { - klog.V(1).Infof("listen: router.Error failed. Err: %v\n", err) - } - } else { - klog.V(7).Infof("listen: Deepgram Error: %v\n", err) - } - - // reset connection - c.closeWs(true) - return - case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: - klog.V(3).Infof("stream object EOF\n") - klog.V(6).Infof("live.listen() LEAVE\n") - return - default: - klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err) + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) } - continue - } - if len(byMsg) == 0 { - klog.V(7).Infof("listen: message empty") - continue - } + // fatal close + c.closeWs(true) - if c.callback != nil { - err := c.router.Message(byMsg) - if err != nil { - klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err) + klog.V(6).Infof("live.listen() LEAVE\n") + return + case strings.Contains(errStr, "Deepgram"): + klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Deepgram ErrorMsg. Err: %v\n", sendErr) } - } else { - klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg)) + + // close the connection + c.closeWs(false) + + klog.V(6).Infof("live.listen() LEAVE\n") + return + case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: + klog.V(3).Infof("stream object EOF\n") + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("live.listen() LEAVE\n") + return + default: + klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err) } + continue + } + + if len(byMsg) == 0 { + klog.V(7).Infof("listen: message empty") + continue + } + + if c.callback != nil { + err := c.router.Message(byMsg) + if err != nil { + klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err) + } + } else { + klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg)) } } } @@ -387,6 +406,7 @@ func (c *Client) Stream(r io.Reader) error { } if bytesRead == 0 { + klog.V(7).Infof("Skipping. bytesRead == 0\n") continue } @@ -590,6 +610,10 @@ func (c *Client) ping() { select { case <-c.ctx.Done(): klog.V(3).Infof("live.ping() Exiting\n") + + // exit gracefully + c.closeWs(false) + klog.V(6).Infof("live.ping() LEAVE\n") return case <-ticker.C: @@ -621,7 +645,19 @@ func (c *Client) ping() { } } -func (c *Client) ErrorToResponse(err error) *msginterfaces.ErrorResponse { +// sendError sends an error message to the callback handler +func (c *Client) sendError(err error) error { + response := c.errorToResponse(err) + sendErr := c.router.ErrorHelper(response) + if err != nil { + klog.V(1).Infof("listen: router.Error failed. Err: %v\n", sendErr) + } + + return err +} + +// errorToResponse converts an error into a Deepgram error response +func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse { r := regexp.MustCompile(`websocket: ([a-z]+) (\d+) .+: (.+)`) var errorCode string @@ -641,8 +677,8 @@ func (c *Client) ErrorToResponse(err error) *msginterfaces.ErrorResponse { response := &msginterfaces.ErrorResponse{ Type: msginterfaces.TypeErrorResponse, - Message: fmt.Sprintf("%s %s", errorCode, errorNum), - Description: errorDesc, + Message: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)), + Description: strings.TrimSpace(errorDesc), Variant: errorNum, } return response diff --git a/tests/edge_cases/cancel/main.go b/tests/edge_cases/cancel/main.go index df16c917..39781066 100644 --- a/tests/edge_cases/cancel/main.go +++ b/tests/edge_cases/cancel/main.go @@ -16,7 +16,9 @@ import ( func main() { // init library - client.InitWithDefault() + client.Init(client.InitLib{ + LogLevel: client.LogLevelTrace, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace + }) // Go context ctx := context.Background()