Skip to content

Commit

Permalink
Fix Reconnect Logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dvonthenen committed May 31, 2024
1 parent 2479aa6 commit 63ebc06
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 9 deletions.
30 changes: 22 additions & 8 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,34 @@ func NewWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey str
return &conn, nil
}

// Connect performs a websocket connection with "defaultConnectRetry" number of retries.
// Connect performs a websocket connection with "DefaultConnectRetry" number of retries.
func (c *Client) Connect() *websocket.Conn {
return c.ConnectWithRetry(int(defaultConnectRetry))
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(DefaultConnectRetry))
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnect(retries int64) *websocket.Conn {
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) *websocket.Conn {
c.retry = true
return c.ConnectWithRetry(int(retries))
c.ctx, c.ctxCancel = context.WithCancel(ctx)
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(retries))
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) *websocket.Conn {
c.retry = true
return c.ConnectWithRetry(ctx, ctxCancel, int(retries))
}

// ConnectWithRetry allows for connecting with specified retry attempts
//
//nolint:funlen // this is a complex function. keep as is
func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn {
func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
klog.V(7).Infof("live.Connect() ENTER\n")

// set the context
c.ctx = ctx
c.ctxCancel = ctxCancel

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
Expand All @@ -140,7 +151,7 @@ func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn {

// set the retry count
if retryCnt <= 0 {
c.retryCnt = defaultConnectRetry
c.retryCnt = DefaultConnectRetry
} else {
c.retryCnt = int64(retryCnt)
}
Expand Down Expand Up @@ -222,6 +233,7 @@ func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn {
}
if err != nil {
klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host)
klog.V(1).Infof("Dialer failed. Err: %v\n", err)
c.mu.Unlock()
continue
}
Expand Down Expand Up @@ -268,7 +280,7 @@ func (c *Client) listen() {
for {
select {
case <-c.ctx.Done():
c.Stop()
c.closeWs(false)
klog.V(6).Infof("live.listen() Done\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
Expand Down Expand Up @@ -513,8 +525,10 @@ func (c *Client) Finalize() error {

// Stop will send close message and shutdown websocket connection
func (c *Client) Stop() {
klog.V(3).Infof("Stop Stopping...\n")
klog.V(3).Infof("Stopping...\n")
c.retry = false

// exit gracefully
c.ctxCancel()
c.closeWs(false)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/live/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ import (
const (
pingPeriod = 5 * time.Second

defaultConnectRetry int64 = 3
defaultDelayBetweenRetry int64 = 2
)

// external constants
const (
DefaultConnectRetry int64 = 3

ChunkSize = 1024 * 2
TerminationSleep = 100 * time.Millisecond
Expand Down
206 changes: 206 additions & 0 deletions tests/edge_cases/reconnect_client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
// SPDX-License-Identifier: MIT

package main

// streaming
import (
"context"
"fmt"
"os"
"strings"
"time"

api "github.com/deepgram/deepgram-go-sdk/pkg/api/live/v1/interfaces"
microphone "github.com/deepgram/deepgram-go-sdk/pkg/audio/microphone"
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces"
client "github.com/deepgram/deepgram-go-sdk/pkg/client/live"
)

// Implement your own callback
type MyCallback struct {
sb *strings.Builder
}

func (c MyCallback) Message(mr *api.MessageResponse) error {
// handle the message
sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript)

if len(mr.Channel.Alternatives) == 0 || sentence == "" {
return nil
}

if mr.IsFinal {
c.sb.WriteString(sentence)
c.sb.WriteString(" ")

if mr.SpeechFinal {
fmt.Printf("[------- Is Final]: %s\n", c.sb.String())
c.sb.Reset()
}
} else {
fmt.Printf("[Interim Result]: %s\n", sentence)
}

return nil
}

func (c MyCallback) Open(ocr *api.OpenResponse) error {
// handle the open
fmt.Printf("\n[Open] Received\n")
return nil
}

func (c MyCallback) Metadata(md *api.MetadataResponse) error {
// handle the metadata
fmt.Printf("\n[Metadata] Received\n")
fmt.Printf("Metadata.RequestID: %s\n", strings.TrimSpace(md.RequestID))
fmt.Printf("Metadata.Channels: %d\n", md.Channels)
fmt.Printf("Metadata.Created: %s\n\n", strings.TrimSpace(md.Created))
return nil
}

func (c MyCallback) SpeechStarted(ssr *api.SpeechStartedResponse) error {
fmt.Printf("\n[SpeechStarted] Received\n")
return nil
}

func (c MyCallback) UtteranceEnd(ur *api.UtteranceEndResponse) error {
utterance := strings.TrimSpace(c.sb.String())
if len(utterance) > 0 {
fmt.Printf("[------- UtteranceEnd]: %s\n", utterance)
c.sb.Reset()
} else {
fmt.Printf("\n[UtteranceEnd] Received\n")
}

return nil
}

func (c MyCallback) Close(ocr *api.CloseResponse) error {
// handle the close
fmt.Printf("\n[Close] Received\n")
return nil
}

func (c MyCallback) Error(er *api.ErrorResponse) error {
// handle the error
fmt.Printf("\n[Error] Received\n")
fmt.Printf("Error.Type: %s\n", er.Type)
fmt.Printf("Error.Message: %s\n", er.Message)
fmt.Printf("Error.Description: %s\n\n", er.Description)
return nil
}

func (c MyCallback) UnhandledEvent(byData []byte) error {
// handle the unhandled event
fmt.Printf("\n[UnhandledEvent] Received\n")
fmt.Printf("UnhandledEvent: %s\n\n", string(byData))
return nil
}

func main() {
// init library
microphone.Initialize()

/*
DG Streaming API
*/
// init library
client.Init(client.InitLib{
LogLevel: client.LogLevelTrace, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace
})

// Go context
ctx := context.Background()

// client options
cOptions := &interfaces.ClientOptions{
EnableKeepAlive: true,
}

// set the Transcription options
tOptions := &interfaces.LiveTranscriptionOptions{
Model: "nova-2",
Language: "en-US",
Punctuate: true,
Encoding: "linear16",
Channels: 1,
SampleRate: 16000,
SmartFormat: true,
VadEvents: true,
// To get UtteranceEnd, the following must be set:
InterimResults: true,
UtteranceEndMs: "1000",
// End of UtteranceEnd settings
}

// implement your own callback
callback := MyCallback{
sb: &strings.Builder{},
}

// create a Deepgram client
dgClient, err := client.New(ctx, "", cOptions, tOptions, callback)
if err != nil {
fmt.Println("ERROR creating LiveTranscription connection:", err)
return
}

for i := 0; i < 10; i++ {
if i > 0 {
time.Sleep(5 * time.Second)
}

// connect the websocket to Deepgram
wsconn := dgClient.AttemptReconnect(context.Background(), 3)
if wsconn == nil {
fmt.Println("Client.AttemptReconnect failed")
os.Exit(1)
}

/*
Microphone package
*/
// mic stuf
mic, err := microphone.New(microphone.AudioConfig{
InputChannels: 1,
SamplingRate: 16000,
})
if err != nil {
fmt.Printf("Initialize failed. Err: %v\n", err)
os.Exit(1)
}

// start the mic
err = mic.Start()
if err != nil {
fmt.Printf("mic.Start failed. Err: %v\n", err)
os.Exit(1)
}

go func() {
// feed the microphone stream to the Deepgram client (this is a blocking call)
err := mic.Stream(dgClient)
if err != nil {
fmt.Printf("mic.Stream non-fatal error. Err: %v\n", err)
}
}()

// sleep for 10 seconds
time.Sleep(10 * time.Second)

// close and repeat
err = mic.Stop()
if err != nil {
fmt.Printf("mic.Stop non-fatal error. Err: %v\n", err)
}
dgClient.Stop()
}

// teardown library
microphone.Teardown()

fmt.Printf("\n\nProgram exiting...\n")
}

0 comments on commit 63ebc06

Please sign in to comment.