From ba6f8461b7935cc051b8b543e74f340f6ab601f4 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Fri, 21 Jun 2024 09:53:20 -0700 Subject: [PATCH] Port WebSocket Threading Fixes Live -> Speak, Port Fixes From Speak -> Live --- examples/prerecorded/file/main.go | 1 + examples/speak/stream/interactive/main.go | 21 +- examples/streaming/microphone/main.go | 2 +- pkg/api/live/v1/default.go | 4 +- pkg/api/live/v1/interfaces/types.go | 14 +- pkg/api/speak-stream/v1/interfaces/types.go | 9 - pkg/client/interfaces/utils.go | 1 - pkg/client/live/client.go | 29 +- pkg/client/live/constants.go | 10 +- pkg/client/speak/client_stream.go | 413 ++++++++++++-------- pkg/client/speak/constants.go | 13 + tests/edge_cases/reconnect_client/main.go | 2 +- 12 files changed, 290 insertions(+), 229 deletions(-) diff --git a/examples/prerecorded/file/main.go b/examples/prerecorded/file/main.go index 13d7f044..7b446d3a 100644 --- a/examples/prerecorded/file/main.go +++ b/examples/prerecorded/file/main.go @@ -37,6 +37,7 @@ func main() { Paragraphs: true, SmartFormat: true, Language: "en-US", + Utterances: true, } // create a Deepgram client diff --git a/examples/speak/stream/interactive/main.go b/examples/speak/stream/interactive/main.go index 614f394a..fd1c6eea 100644 --- a/examples/speak/stream/interactive/main.go +++ b/examples/speak/stream/interactive/main.go @@ -18,7 +18,6 @@ import ( const ( API_KEY = "" TTS_TEXT = "Hello, this is a text to speech example using Deepgram." - AUDIO_DIR = "tts_outputs" AUDIO_FILE = "output.mp3" ) @@ -31,26 +30,8 @@ func (c MyCallback) Metadata(md *msginterfaces.MetadataResponse) error { return nil } -func (c MyCallback) SpeakStream(ss *msginterfaces.SpeakStreamResponse) error { - fmt.Printf("\n[SpeakStream] Received\n") - fmt.Printf("SpeakStream.ContentType: %s\n", strings.TrimSpace(ss.ContentType)) - fmt.Printf("SpeakStream.Date: %s\n", strings.TrimSpace(ss.Date)) - fmt.Printf("SpeakStream.ModelName: %s\n", strings.TrimSpace(ss.ModelName)) - fmt.Printf("SpeakStream.ModelUUID: %s\n", strings.TrimSpace(ss.ModelUUID)) - fmt.Printf("SpeakStream.RequestID: %s\n", strings.TrimSpace(ss.RequestID)) - return nil -} - func (c MyCallback) Binary(byMsg []byte) error { - if _, err := os.Stat(AUDIO_DIR); os.IsNotExist(err) { - err := os.Mkdir(AUDIO_DIR, 0755) - if err != nil { - fmt.Printf("Error creating directory %s: %v\n", AUDIO_DIR, err) - return err - } - } - - filePath := fmt.Sprintf("%s/%s", AUDIO_DIR, AUDIO_FILE) + filePath := fmt.Sprintf("%s", AUDIO_FILE) file, err := os.Create(filePath) if err != nil { fmt.Printf("Error creating file %s: %v\n", filePath, err) diff --git a/examples/streaming/microphone/main.go b/examples/streaming/microphone/main.go index 3b9c6db2..f169cbea 100644 --- a/examples/streaming/microphone/main.go +++ b/examples/streaming/microphone/main.go @@ -88,7 +88,7 @@ func (c MyCallback) Error(er *api.ErrorResponse) error { // handle the error fmt.Printf("\n[Error] Received\n") fmt.Printf("Error.Type: %s\n", er.Type) - fmt.Printf("Error.Message: %s\n", er.Message) + fmt.Printf("Error.ErrCode: %s\n", er.ErrCode) fmt.Printf("Error.Description: %s\n\n", er.Description) return nil } diff --git a/pkg/api/live/v1/default.go b/pkg/api/live/v1/default.go index e02d037e..830eadb7 100644 --- a/pkg/api/live/v1/default.go +++ b/pkg/api/live/v1/default.go @@ -249,8 +249,8 @@ func (dch DefaultCallbackHandler) Error(er *interfaces.ErrorResponse) error { // handle the message fmt.Printf("\n[ErrorResponse]\n") - fmt.Printf("\nError.Type: %s\n", er.Type) - fmt.Printf("Error.Message: %s\n", er.Message) + fmt.Printf("\nError.Type: %s\n", er.ErrCode) + fmt.Printf("Error.Message: %s\n", er.ErrMsg) fmt.Printf("Error.Description: %s\n\n", er.Description) fmt.Printf("Error.Variant: %s\n\n", er.Variant) diff --git a/pkg/api/live/v1/interfaces/types.go b/pkg/api/live/v1/interfaces/types.go index f73a4ab7..c9a99dca 100644 --- a/pkg/api/live/v1/interfaces/types.go +++ b/pkg/api/live/v1/interfaces/types.go @@ -8,6 +8,11 @@ import ( interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" ) +/***********************************/ +// Request/Input structs +/***********************************/ +type LiveOptions interfaces.LiveTranscriptionOptions + /***********************************/ // MessageType is the header to bootstrap you way unmarshalling other messages /***********************************/ @@ -123,10 +128,5 @@ type CloseResponse struct { Type string `json:"type,omitempty"` } -// ErrorResponse is the response from a live transcription -type ErrorResponse struct { - Description string `json:"description"` - Message string `json:"message"` - Type string `json:"type"` - Variant string `json:"variant"` -} +// ErrorResponse is the Deepgram specific response error +type ErrorResponse interfaces.DeepgramError diff --git a/pkg/api/speak-stream/v1/interfaces/types.go b/pkg/api/speak-stream/v1/interfaces/types.go index af322b8c..4135278c 100644 --- a/pkg/api/speak-stream/v1/interfaces/types.go +++ b/pkg/api/speak-stream/v1/interfaces/types.go @@ -29,15 +29,6 @@ type MessageType struct { Type string `json:"type"` } -// SpeakStreamResponse is the response from the text-to-speech request -type SpeakStreamResponse struct { - ContentType string `json:"content_type,omitempty"` - RequestID string `json:"request_id,omitempty"` - ModelUUID string `json:"model_uuid,omitempty"` - ModelName string `json:"model_name,omitempty"` - Date string `json:"date,omitempty"` -} - // MetadataResponse is the response from the text-to-speech request which contains metadata about the request type MetadataResponse struct { Type string `json:"type,omitempty"` diff --git a/pkg/client/interfaces/utils.go b/pkg/client/interfaces/utils.go index 20c74831..f44eb440 100644 --- a/pkg/client/interfaces/utils.go +++ b/pkg/client/interfaces/utils.go @@ -75,7 +75,6 @@ type DeepgramError struct { ErrCode string `json:"err_code,omitempty"` ErrMsg string `json:"err_msg,omitempty"` Description string `json:"description,omitempty"` - RequestID string `json:"request_id,omitempty"` Variant string `json:"variant,omitempty"` } diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index b5fc831d..d54b045b 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -211,7 +211,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex if headers, ok := c.ctx.Value(interfaces.HeadersContext{}).(http.Header); ok { for k, v := range headers { for _, v := range v { - klog.V(3).Infof("Connect() RESTORE Header: %s = %s\n", k, v) + klog.V(3).Infof("internalConnectWithCancel RESTORE Header: %s = %s\n", k, v) myHeader.Add(k, v) } } @@ -326,7 +326,7 @@ func (c *Client) listen() { // release the lock c.muConn.Unlock() - // read the message + // msgType can be binary or text msgType, byMsg, err := ws.ReadMessage() if err != nil { @@ -414,7 +414,7 @@ func (c *Client) listen() { } if len(byMsg) == 0 { - klog.V(7).Infof("listen: message empty") + klog.V(7).Infof("listen(): message empty") continue } @@ -428,12 +428,17 @@ func (c *Client) listen() { // callback! if c.callback != nil { - err := c.router.Message(byMsg) - if err != nil { - klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err) + if msgType == websocket.TextMessage { + err := c.router.Message(byMsg) + if err != nil { + klog.V(1).Infof("live.listen(): router.Message failed. Err: %v\n", err) + } + } else { + // this shouldn't happen, but let's log it + klog.V(7).Infof("live.listen(): msg recv: type %d, len: %d\n", msgType, len(byMsg)) } } else { - klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg)) + klog.V(7).Infof("callback is nil: msg recv: type %d, len: %d\n", msgType, len(byMsg)) } } } @@ -535,7 +540,7 @@ func (c *Client) WriteJSON(payload interface{}) error { byData, err := json.Marshal(payload) if err != nil { - klog.V(1).Infof("WriteJSON json.Marshal failed. Err: %v\n", err) + klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err) klog.V(7).Infof("live.WriteJSON() LEAVE\n") return err } @@ -592,9 +597,9 @@ func (c *Client) Write(p []byte) (int, error) { func (c *Client) KeepAlive() error { klog.V(7).Infof("live.KeepAlive() ENTER\n") - err := c.WriteJSON(controlMessage{Type: "KeepAlive"}) + err := c.WriteJSON(controlMessage{Type: MessageTypeKeepAlive}) if err != nil { - klog.V(1).Infof("Finalize failed. Err: %v\n", err) + klog.V(1).Infof("KeepAlive failed. Err: %v\n", err) klog.V(7).Infof("live.KeepAlive() LEAVE\n") return err @@ -609,7 +614,7 @@ func (c *Client) KeepAlive() error { func (c *Client) Finalize() error { klog.V(7).Infof("live.KeepAlive() ENTER\n") - err := c.WriteJSON(controlMessage{Type: "Finalize"}) + err := c.WriteJSON(controlMessage{Type: MessageTypeFinalize}) if err != nil { klog.V(1).Infof("Finalize failed. Err: %v\n", err) klog.V(7).Infof("live.Finalize() LEAVE\n") @@ -842,7 +847,7 @@ func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse { response := &msginterfaces.ErrorResponse{ Type: msginterfaces.TypeErrorResponse, - Message: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)), + ErrMsg: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)), Description: strings.TrimSpace(errorDesc), Variant: errorNum, } diff --git a/pkg/client/live/constants.go b/pkg/client/live/constants.go index d1d13ed0..a3633ee1 100644 --- a/pkg/client/live/constants.go +++ b/pkg/client/live/constants.go @@ -7,7 +7,6 @@ package live import ( "errors" "time" - // gabs "github.com/Jeffail/gabs/v2" ) // internal constants for retry, waits, back-off, etc. @@ -37,15 +36,18 @@ const ( ) const ( + // MessageTypeKeepAlive keep the connection alive + MessageTypeKeepAlive string = "KeepAlive" + // MessageTypeFinalize flushes the transcription from the server MessageTypeFinalize string = "Finalize" - - // MessageTypeCloseStream closes the stream - MessageTypeCloseStream string = "CloseStream" ) // errors var ( + // ErrInvalidInput required input was not found + ErrInvalidInput = errors.New("required input was not found") + // ErrInvalidConnection connection is not valid ErrInvalidConnection = errors.New("connection is not valid") ) diff --git a/pkg/client/speak/client_stream.go b/pkg/client/speak/client_stream.go index 9b44d342..4daf0d3a 100644 --- a/pkg/client/speak/client_stream.go +++ b/pkg/client/speak/client_stream.go @@ -27,6 +27,10 @@ import ( interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" ) +type controlMessage struct { + Type string `json:"type"` +} + /* NewStreamForDemo creates a new websocket connection with all default options @@ -120,35 +124,35 @@ func (c *StreamClient) Connect() bool { if c.retryCnt == 0 { c.retryCnt = DefaultConnectRetry } - return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) != nil + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), true) != nil } // ConnectWithCancel performs a websocket connection with specified number of retries and providing a // cancel function to stop the connection func (c *StreamClient) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool { - return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt) != nil + return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt, true) != nil } // AttemptReconnect performs a reconnect after failing retries func (c *StreamClient) AttemptReconnect(ctx context.Context, retries int64) bool { c.retry = true c.ctx, c.ctxCancel = context.WithCancel(ctx) - return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries)) != nil + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries), true) != nil } // AttemptReconnect performs a reconnect after failing retries and providing a cancel function func (c *StreamClient) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool { c.retry = true - return c.internalConnectWithCancel(ctx, ctxCancel, int(retries)) != nil + return c.internalConnectWithCancel(ctx, ctxCancel, int(retries), true) != nil } func (c *StreamClient) internalConnect() *websocket.Conn { - return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false) } //nolint:funlen // this is a complex function. keep as is -func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn { - klog.V(7).Infof("StreamClient.Connect() ENTER\n") +func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn { + klog.V(7).Infof("StreamClient.internalConnectWithCancel() ENTER\n") // set the context c.ctx = ctx @@ -158,22 +162,38 @@ func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel // we explicitly stopped and should not attempt to reconnect if !c.retry { klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.") - klog.V(7).Infof("StreamClient.Connect() LEAVE\n") + klog.V(7).Infof("StreamClient.internalConnectWithCancel() LEAVE\n") return nil } + // lock conn access + if lock { + klog.V(3).Infof("Locking connection mutex\n") + c.muConn.Lock() + defer c.muConn.Unlock() + } + // if the connection is good, return it otherwise, attempt reconnect if c.wsconn != nil { select { case <-c.ctx.Done(): klog.V(1).Infof("Connection is not valid\n") - klog.V(7).Infof("StreamClient.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("StreamClient.internalConnectWithCancel() LEAVE\n") return nil default: klog.V(7).Infof("Connection is good. Return object.") - klog.V(7).Infof("StreamClient.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("StreamClient.internalConnectWithCancel() LEAVE\n") return c.wsconn } + } else { + select { + case <-c.ctx.Done(): + klog.V(1).Infof("Context is not valid. Has been canceled.\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") + return nil + default: + klog.V(3).Infof("Context is still valid. Retry...\n") + } } dialer := websocket.Dialer{ @@ -191,7 +211,7 @@ func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel if headers, ok := c.ctx.Value(interfaces.HeadersContext{}).(http.Header); ok { for k, v := range headers { for _, v := range v { - klog.V(3).Infof("Connect() RESTORE Header: %s = %s\n", k, v) + klog.V(3).Infof("internalConnectWithCancel RESTORE Header: %s = %s\n", k, v) myHeader.Add(k, v) } } @@ -223,15 +243,11 @@ func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel url, err := version.GetSpeakStreamAPI(c.ctx, c.cOptions.Host, c.cOptions.APIVersion, c.cOptions.Path, c.sOptions) if err != nil { klog.V(1).Infof("version.GetSpeakAPI failed. Err: %v\n", err) - klog.V(7).Infof("StreamClient.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("StreamClient.internalConnectWithCancel() LEAVE\n") return nil // no point in retrying because this is going to fail on every retry } klog.V(5).Infof("Connecting to %s\n", url) - // a single connection attempt - // Note: not using defer here because we aren't leaving the scope of the function - c.muConn.Lock() - // perform the websocket connection ws, res, err := dialer.DialContext(c.ctx, url, myHeader) if res != nil { @@ -241,7 +257,6 @@ func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel if err != nil { klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host) klog.V(1).Infof("Dialer failed. Err: %v\n", err) - c.muConn.Unlock() continue } @@ -249,9 +264,6 @@ func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel c.wsconn = ws c.retry = true - // unlock the connection - c.muConn.Unlock() - // kick off threads to listen for messages go c.listen() @@ -264,7 +276,7 @@ func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel } klog.V(3).Infof("WebSocket Connection Successful!") - klog.V(7).Infof("StreamClient.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("StreamClient.internalConnectWithCancel() LEAVE\n") return c.wsconn } @@ -276,10 +288,11 @@ func (c *StreamClient) internalConnectWithCancel(ctx context.Context, ctxCancel return nil } +//nolint:funlen,gocyclo // this is a complex function. keep as is func (c *StreamClient) listen() { klog.V(6).Infof("StreamClient.listen() ENTER\n") - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() for { select { @@ -289,115 +302,144 @@ func (c *StreamClient) listen() { klog.V(6).Infof("StreamClient.listen() LEAVE\n") return case <-ticker.C: - ws := c.internalConnect() - if ws == nil { - klog.V(3).Infof("StreamClient.listen(): Connection is not valid\n") - klog.V(6).Infof("StreamClient.listen() LEAVE\n") - return - } - - // msgType can be binary or text - msgType, byMsg, err := ws.ReadMessage() - if err != nil { - errStr := err.Error() - switch { - case strings.Contains(errStr, SuccessfulSocketErr): - klog.V(3).Infof("Graceful websocket close\n") - - // graceful close - c.closeWs(false) - - klog.V(6).Infof("StreamClient.listen() LEAVE\n") - return - case strings.Contains(errStr, FatalReadSocketErr): - klog.V(1).Infof("Fatal socket error: %v\n", err) - - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("StreamClient.listen(): Fatal socket error. Err: %v\n", sendErr) - } - - // fatal close - c.closeWs(true) - - klog.V(6).Infof("StreamClient.listen() LEAVE\n") - return - case strings.Contains(errStr, "Deepgram"): - klog.V(1).Infof("StreamClient.listen(): Deepgram error. Err: %v\n", err) - - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("StreamClient.listen(): Deepgram ErrorMsg. Err: %v\n", sendErr) - } - - // close the connection - c.closeWs(false) - - klog.V(6).Infof("StreamClient.listen() LEAVE\n") - return - case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: - klog.V(3).Infof("StreamClient object EOF\n") - - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("StreamClient.listen(): EOF error. Err: %v\n", sendErr) - } - - // close the connection - c.closeWs(true) - - klog.V(6).Infof("StreamClient.listen() LEAVE\n") + for { + // doing a read, need to lock + c.muConn.Lock() + + // get the connection + ws := c.internalConnect() + if ws == nil { + // release + c.muConn.Unlock() + + klog.V(3).Infof("listen: Connection is not valid\n") + klog.V(6).Infof("live.listen() LEAVE\n") return - default: - klog.V(1).Infof("StreamClient.listen(): Cannot read websocket message. Err: %v\n", err) + } - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("StreamClient.listen(): EOF error. Err: %v\n", sendErr) + // release the lock + c.muConn.Unlock() + + // msgType can be binary or text + msgType, byMsg, err := ws.ReadMessage() + + if err != nil { + errStr := err.Error() + switch { + case strings.Contains(errStr, SuccessfulSocketErr): + klog.V(3).Infof("Graceful websocket close\n") + + // graceful close + c.closeWs(false) + + klog.V(6).Infof("StreamClient.listen() LEAVE\n") + return + case strings.Contains(errStr, UseOfClosedSocket): + klog.V(3).Infof("Probable graceful websocket close: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(false) + + klog.V(6).Infof("StreamClient.listen() LEAVE\n") + return + case strings.Contains(errStr, FatalReadSocketErr): + klog.V(1).Infof("Fatal socket error: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("StreamClient.listen(): Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(true) + + klog.V(6).Infof("StreamClient.listen() LEAVE\n") + return + case strings.Contains(errStr, "Deepgram"): + klog.V(1).Infof("StreamClient.listen(): Deepgram error. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("StreamClient.listen(): Deepgram ErrorMsg. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(false) + + klog.V(6).Infof("StreamClient.listen() LEAVE\n") + return + case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: + klog.V(3).Infof("StreamClient object EOF\n") + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("StreamClient.listen(): EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("StreamClient.listen() LEAVE\n") + return + default: + klog.V(1).Infof("StreamClient.listen(): Cannot read websocket message. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("StreamClient.listen(): EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("StreamClient.listen() LEAVE\n") + return } - - // close the connection - c.closeWs(true) - - klog.V(6).Infof("StreamClient.listen() LEAVE\n") - return } - } - - if len(byMsg) == 0 { - klog.V(7).Infof("StreamClient.listen(): message empty") - continue - } - // inspect the message - // if c.cOptions.InspectMessage() { - // err := c.inspect(byMsg) - // if err != nil { - // klog.V(1).Infof("StreamClient.listen(): inspect failed. Err: %v\n", err) - // } - // } - - if c.callback != nil { - if msgType == websocket.TextMessage { - err := c.router.Message(byMsg) - if err != nil { - klog.V(1).Infof("StreamClient.listen(): router.Message failed. Err: %v\n", err) - } + if len(byMsg) == 0 { + klog.V(7).Infof("listen(): message empty") + continue + } - } else if msgType == websocket.BinaryMessage { - err := c.router.Binary(byMsg) - if err != nil { - klog.V(1).Infof("StreamClient.listen(): router.Message failed. Err: %v\n", err) + // inspect the message + // if c.cOptions.InspectMessage() { + // err := c.inspect(byMsg) + // if err != nil { + // klog.V(1).Infof("StreamClient.listen(): inspect failed. Err: %v\n", err) + // } + // } + + if c.callback != nil { + if msgType == websocket.TextMessage { + switch msgType { + case websocket.TextMessage: + err := c.router.Message(byMsg) + if err != nil { + klog.V(1).Infof("StreamClient.listen(): router.Message failed. Err: %v\n", err) + } + case websocket.BinaryMessage: + err := c.router.Binary(byMsg) + if err != nil { + klog.V(1).Infof("StreamClient.listen(): router.Message failed. Err: %v\n", err) + } + default: + klog.V(7).Infof("StreamClient.listen(): msg recv: type %d, len: %d\n", msgType, len(byMsg)) + } } } else { - klog.V(7).Infof("StreamClient.listen(): msg recv (type %d): %s\n", msgType, string(byMsg)) + klog.V(7).Infof("callback is nil: msg recv: type %d, len: %d\n", msgType, len(byMsg)) } - } else { - klog.V(7).Infof("StreamClient.listen(): msg recv (type %d): %s\n", msgType, string(byMsg)) } } } @@ -408,6 +450,10 @@ func (c *StreamClient) WriteBinary(byData []byte) error { klog.V(7).Infof("StreamClient.WriteBinary() ENTER\n") // doing a write, need to lock + c.muConn.Lock() + defer c.muConn.Unlock() + + // get the connection ws := c.internalConnect() if ws == nil { err := ErrInvalidConnection @@ -417,10 +463,6 @@ func (c *StreamClient) WriteBinary(byData []byte) error { return err } - // doing a write, need to lock - c.muConn.Lock() - defer c.muConn.Unlock() - if err := ws.WriteMessage( websocket.BinaryMessage, byData, @@ -444,16 +486,6 @@ managing the text-to-speech session on the Deepgram server. func (c *StreamClient) WriteJSON(payload interface{}) error { klog.V(7).Infof("StreamClient.WriteJSON() ENTER\n") - // doing a write, need to lock - ws := c.internalConnect() - if ws == nil { - err := ErrInvalidConnection - klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) - klog.V(7).Infof("StreamClient.WriteJSON() LEAVE\n") - - return err - } - byData, err := json.Marshal(payload) if err != nil { klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err) @@ -465,6 +497,15 @@ func (c *StreamClient) WriteJSON(payload interface{}) error { c.muConn.Lock() defer c.muConn.Unlock() + // doing a write, need to lock + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(7).Infof("StreamClient.WriteJSON() LEAVE\n") + + return err + } if err := ws.WriteMessage( websocket.TextMessage, byData, @@ -503,22 +544,14 @@ func (c *StreamClient) Write(p []byte) (int, error) { func (c *StreamClient) Flush() error { klog.V(7).Infof("StreamClient.Flush() ENTER\n") - // doing a write, need to lock - ws := c.internalConnect() - if ws == nil { - err := ErrInvalidConnection - klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) + err := c.WriteJSON(controlMessage{Type: MessageTypeFlush}) + if err != nil { + klog.V(1).Infof("Flush failed. Err: %v\n", err) klog.V(7).Infof("StreamClient.Flush() LEAVE\n") return err } - // doing a write, need to lock - c.muConn.Lock() - defer c.muConn.Unlock() - - err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Flush\" }")) - klog.V(4).Infof("Flush Succeeded\n") klog.V(7).Infof("StreamClient.Flush() LEAVE\n") @@ -529,27 +562,73 @@ func (c *StreamClient) Flush() error { func (c *StreamClient) Reset() error { klog.V(6).Infof("StreamClient.Reset() ENTER\n") - resetMessage := map[string]string{ - "type": "Reset", + err := c.WriteJSON(controlMessage{Type: MessageTypeReset}) + if err != nil { + klog.V(1).Infof("Reset failed. Err: %v\n", err) + klog.V(7).Infof("StreamClient.Reset() LEAVE\n") + + return err } - msg, err := json.Marshal(resetMessage) + klog.V(4).Infof("Reset Succeeded\n") + klog.V(6).Infof("StreamClient.Reset() LEAVE\n") + return nil +} + +func (c *StreamClient) closeStream(lock bool) error { + klog.V(7).Infof("StreamClient.closeStream() ENTER\n") + + // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + + err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Close\" }")) if err != nil { - klog.V(1).Infof("json.Marshal failed. Err: %v\n", err) - klog.V(6).Infof("StreamClient.Reset() LEAVE\n") + klog.V(1).Infof("WriteMessage failed. Err: %v\n", err) + klog.V(7).Infof("StreamClient.closeStream() LEAVE\n") + return err } - err = c.WriteJSON(msg) - if err != nil { - klog.V(1).Infof("WriteJSON failed. Err: %v\n", err) - klog.V(6).Infof("StreamClient.Reset() LEAVE\n") + klog.V(4).Infof("closeStream Succeeded\n") + klog.V(7).Infof("StreamClient.closeStream() LEAVE\n") + + return err +} + +func (c *StreamClient) normalClosure(lock bool) error { + klog.V(7).Infof("StreamClient.normalClosure() ENTER\n") + + // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(7).Infof("StreamClient.normalClosure() LEAVE\n") + return err } - klog.V(4).Infof("Reset Succeeded\n") - klog.V(6).Infof("StreamClient.Reset() LEAVE\n") - return nil + err := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + switch err { + case websocket.ErrCloseSent: + klog.V(3).Infof("ErrCloseSent was sent. Err: %v\n", err) + case nil: + klog.V(4).Infof("normalClosure Succeeded\n") + default: + klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err) + } + + klog.V(7).Infof("StreamClient.normalClosure() LEAVE\n") + + return err } // Stop will send close message and shutdown websocket connection @@ -571,21 +650,11 @@ func (c *StreamClient) closeWs(fatal bool) { if c.wsconn != nil && !fatal { // deepgram requires a close message to be sent - errDg := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Close\" }")) - if errDg == websocket.ErrCloseSent { - klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg) - } else if errDg != nil { - klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg) - } + _ = c.closeStream(false) time.Sleep(TerminationSleep) // allow time for server to register closure // websocket protocol message - errProto := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - if errProto == websocket.ErrCloseSent { - klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto) - } else if errProto != nil { - klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto) - } + _ = c.normalClosure(false) time.Sleep(TerminationSleep) // allow time for server to register closure } @@ -636,7 +705,7 @@ func (c *StreamClient) errorToResponse(err error) *msginterfaces.ErrorResponse { } else { errorCode = UnknownDeepgramErr errorNum = UnknownDeepgramErr - errorDesc = UnknownDeepgramErr + errorDesc = err.Error() } response := &msginterfaces.ErrorResponse{ diff --git a/pkg/client/speak/constants.go b/pkg/client/speak/constants.go index 998c1a54..506508e7 100644 --- a/pkg/client/speak/constants.go +++ b/pkg/client/speak/constants.go @@ -24,16 +24,29 @@ const ( // socket errors FatalReadSocketErr string = "read: can't assign requested address" FatalWriteSocketErr string = "write: broken pipe" + UseOfClosedSocket string = "use of closed network connection" UnknownDeepgramErr string = "unknown deepgram error" // socket successful close error SuccessfulSocketErr string = "close 1000" ) +const ( + // MessageTypeFlush flushes the audio from the server + MessageTypeFlush string = "Flush" + + // MessageTypeReset resets the text buffer + MessageTypeReset string = "Reset" + + // MessageTypeClose closes the stream + MessageTypeClose string = "Close" +) + // errors var ( // ErrInvalidInput required input was not found ErrInvalidInput = errors.New("required input was not found") + // ErrInvalidConnection connection is not valid ErrInvalidConnection = errors.New("connection is not valid") ) diff --git a/tests/edge_cases/reconnect_client/main.go b/tests/edge_cases/reconnect_client/main.go index 3e31bff4..a046b35f 100644 --- a/tests/edge_cases/reconnect_client/main.go +++ b/tests/edge_cases/reconnect_client/main.go @@ -88,7 +88,7 @@ func (c MyCallback) Error(er *api.ErrorResponse) error { // handle the error fmt.Printf("\n[Error] Received\n") fmt.Printf("Error.Type: %s\n", er.Type) - fmt.Printf("Error.Message: %s\n", er.Message) + fmt.Printf("Error.ErrCode: %s\n", er.ErrCode) fmt.Printf("Error.Description: %s\n\n", er.Description) return nil }