From 63ebc06099d8550e109513049785d710430f0cb1 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Fri, 31 May 2024 07:48:52 -0700 Subject: [PATCH] Fix Reconnect Logic --- pkg/client/live/client.go | 30 +++- pkg/client/live/constants.go | 6 +- tests/edge_cases/reconnect_client/main.go | 206 ++++++++++++++++++++++ 3 files changed, 233 insertions(+), 9 deletions(-) create mode 100644 tests/edge_cases/reconnect_client/main.go diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index dc7a1690..6b2c76f1 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -114,23 +114,34 @@ func NewWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey str return &conn, nil } -// Connect performs a websocket connection with "defaultConnectRetry" number of retries. +// Connect performs a websocket connection with "DefaultConnectRetry" number of retries. func (c *Client) Connect() *websocket.Conn { - return c.ConnectWithRetry(int(defaultConnectRetry)) + return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(DefaultConnectRetry)) } // AttemptReconnect performs a reconnect after failing retries -func (c *Client) AttemptReconnect(retries int64) *websocket.Conn { +func (c *Client) AttemptReconnect(ctx context.Context, retries int64) *websocket.Conn { c.retry = true - return c.ConnectWithRetry(int(retries)) + c.ctx, c.ctxCancel = context.WithCancel(ctx) + return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(retries)) +} + +// AttemptReconnect performs a reconnect after failing retries +func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) *websocket.Conn { + c.retry = true + return c.ConnectWithRetry(ctx, ctxCancel, int(retries)) } // ConnectWithRetry allows for connecting with specified retry attempts // //nolint:funlen // this is a complex function. keep as is -func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn { +func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn { klog.V(7).Infof("live.Connect() ENTER\n") + // set the context + c.ctx = ctx + c.ctxCancel = ctxCancel + // we explicitly stopped and should not attempt to reconnect if !c.retry { klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.") @@ -140,7 +151,7 @@ func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn { // set the retry count if retryCnt <= 0 { - c.retryCnt = defaultConnectRetry + c.retryCnt = DefaultConnectRetry } else { c.retryCnt = int64(retryCnt) } @@ -222,6 +233,7 @@ func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn { } if err != nil { klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host) + klog.V(1).Infof("Dialer failed. Err: %v\n", err) c.mu.Unlock() continue } @@ -268,7 +280,7 @@ func (c *Client) listen() { for { select { case <-c.ctx.Done(): - c.Stop() + c.closeWs(false) klog.V(6).Infof("live.listen() Done\n") klog.V(6).Infof("live.listen() LEAVE\n") return @@ -513,8 +525,10 @@ func (c *Client) Finalize() error { // Stop will send close message and shutdown websocket connection func (c *Client) Stop() { - klog.V(3).Infof("Stop Stopping...\n") + klog.V(3).Infof("Stopping...\n") c.retry = false + + // exit gracefully c.ctxCancel() c.closeWs(false) } diff --git a/pkg/client/live/constants.go b/pkg/client/live/constants.go index 2d89d57a..978f98b7 100644 --- a/pkg/client/live/constants.go +++ b/pkg/client/live/constants.go @@ -14,8 +14,12 @@ import ( const ( pingPeriod = 5 * time.Second - defaultConnectRetry int64 = 3 defaultDelayBetweenRetry int64 = 2 +) + +// external constants +const ( + DefaultConnectRetry int64 = 3 ChunkSize = 1024 * 2 TerminationSleep = 100 * time.Millisecond diff --git a/tests/edge_cases/reconnect_client/main.go b/tests/edge_cases/reconnect_client/main.go new file mode 100644 index 00000000..0da5399d --- /dev/null +++ b/tests/edge_cases/reconnect_client/main.go @@ -0,0 +1,206 @@ +// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package main + +// streaming +import ( + "context" + "fmt" + "os" + "strings" + "time" + + api "github.com/deepgram/deepgram-go-sdk/pkg/api/live/v1/interfaces" + microphone "github.com/deepgram/deepgram-go-sdk/pkg/audio/microphone" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" + client "github.com/deepgram/deepgram-go-sdk/pkg/client/live" +) + +// Implement your own callback +type MyCallback struct { + sb *strings.Builder +} + +func (c MyCallback) Message(mr *api.MessageResponse) error { + // handle the message + sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript) + + if len(mr.Channel.Alternatives) == 0 || sentence == "" { + return nil + } + + if mr.IsFinal { + c.sb.WriteString(sentence) + c.sb.WriteString(" ") + + if mr.SpeechFinal { + fmt.Printf("[------- Is Final]: %s\n", c.sb.String()) + c.sb.Reset() + } + } else { + fmt.Printf("[Interim Result]: %s\n", sentence) + } + + return nil +} + +func (c MyCallback) Open(ocr *api.OpenResponse) error { + // handle the open + fmt.Printf("\n[Open] Received\n") + return nil +} + +func (c MyCallback) Metadata(md *api.MetadataResponse) error { + // handle the metadata + fmt.Printf("\n[Metadata] Received\n") + fmt.Printf("Metadata.RequestID: %s\n", strings.TrimSpace(md.RequestID)) + fmt.Printf("Metadata.Channels: %d\n", md.Channels) + fmt.Printf("Metadata.Created: %s\n\n", strings.TrimSpace(md.Created)) + return nil +} + +func (c MyCallback) SpeechStarted(ssr *api.SpeechStartedResponse) error { + fmt.Printf("\n[SpeechStarted] Received\n") + return nil +} + +func (c MyCallback) UtteranceEnd(ur *api.UtteranceEndResponse) error { + utterance := strings.TrimSpace(c.sb.String()) + if len(utterance) > 0 { + fmt.Printf("[------- UtteranceEnd]: %s\n", utterance) + c.sb.Reset() + } else { + fmt.Printf("\n[UtteranceEnd] Received\n") + } + + return nil +} + +func (c MyCallback) Close(ocr *api.CloseResponse) error { + // handle the close + fmt.Printf("\n[Close] Received\n") + return nil +} + +func (c MyCallback) Error(er *api.ErrorResponse) error { + // handle the error + fmt.Printf("\n[Error] Received\n") + fmt.Printf("Error.Type: %s\n", er.Type) + fmt.Printf("Error.Message: %s\n", er.Message) + fmt.Printf("Error.Description: %s\n\n", er.Description) + return nil +} + +func (c MyCallback) UnhandledEvent(byData []byte) error { + // handle the unhandled event + fmt.Printf("\n[UnhandledEvent] Received\n") + fmt.Printf("UnhandledEvent: %s\n\n", string(byData)) + return nil +} + +func main() { + // init library + microphone.Initialize() + + /* + DG Streaming API + */ + // init library + client.Init(client.InitLib{ + LogLevel: client.LogLevelTrace, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace + }) + + // Go context + ctx := context.Background() + + // client options + cOptions := &interfaces.ClientOptions{ + EnableKeepAlive: true, + } + + // set the Transcription options + tOptions := &interfaces.LiveTranscriptionOptions{ + Model: "nova-2", + Language: "en-US", + Punctuate: true, + Encoding: "linear16", + Channels: 1, + SampleRate: 16000, + SmartFormat: true, + VadEvents: true, + // To get UtteranceEnd, the following must be set: + InterimResults: true, + UtteranceEndMs: "1000", + // End of UtteranceEnd settings + } + + // implement your own callback + callback := MyCallback{ + sb: &strings.Builder{}, + } + + // create a Deepgram client + dgClient, err := client.New(ctx, "", cOptions, tOptions, callback) + if err != nil { + fmt.Println("ERROR creating LiveTranscription connection:", err) + return + } + + for i := 0; i < 10; i++ { + if i > 0 { + time.Sleep(5 * time.Second) + } + + // connect the websocket to Deepgram + wsconn := dgClient.AttemptReconnect(context.Background(), 3) + if wsconn == nil { + fmt.Println("Client.AttemptReconnect failed") + os.Exit(1) + } + + /* + Microphone package + */ + // mic stuf + mic, err := microphone.New(microphone.AudioConfig{ + InputChannels: 1, + SamplingRate: 16000, + }) + if err != nil { + fmt.Printf("Initialize failed. Err: %v\n", err) + os.Exit(1) + } + + // start the mic + err = mic.Start() + if err != nil { + fmt.Printf("mic.Start failed. Err: %v\n", err) + os.Exit(1) + } + + go func() { + // feed the microphone stream to the Deepgram client (this is a blocking call) + err := mic.Stream(dgClient) + if err != nil { + fmt.Printf("mic.Stream non-fatal error. Err: %v\n", err) + } + }() + + // sleep for 10 seconds + time.Sleep(10 * time.Second) + + // close and repeat + err = mic.Stop() + if err != nil { + fmt.Printf("mic.Stop non-fatal error. Err: %v\n", err) + } + dgClient.Stop() + } + + // teardown library + microphone.Teardown() + + fmt.Printf("\n\nProgram exiting...\n") +}