Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Reconnect Logic #232

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,34 @@ func NewWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey str
return &conn, nil
}

// Connect performs a websocket connection with "defaultConnectRetry" number of retries.
// Connect performs a websocket connection with "DefaultConnectRetry" number of retries.
func (c *Client) Connect() *websocket.Conn {
return c.ConnectWithRetry(int(defaultConnectRetry))
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(DefaultConnectRetry))
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnect(retries int64) *websocket.Conn {
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) *websocket.Conn {
c.retry = true
return c.ConnectWithRetry(int(retries))
c.ctx, c.ctxCancel = context.WithCancel(ctx)
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(retries))
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) *websocket.Conn {
c.retry = true
return c.ConnectWithRetry(ctx, ctxCancel, int(retries))
}

// ConnectWithRetry allows for connecting with specified retry attempts
//
//nolint:funlen // this is a complex function. keep as is
func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn {
func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
klog.V(7).Infof("live.Connect() ENTER\n")

// set the context
c.ctx = ctx
c.ctxCancel = ctxCancel

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
Expand All @@ -140,7 +151,7 @@ func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn {

// set the retry count
if retryCnt <= 0 {
c.retryCnt = defaultConnectRetry
c.retryCnt = DefaultConnectRetry
} else {
c.retryCnt = int64(retryCnt)
}
Expand Down Expand Up @@ -222,6 +233,7 @@ func (c *Client) ConnectWithRetry(retryCnt int) *websocket.Conn {
}
if err != nil {
klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host)
klog.V(1).Infof("Dialer failed. Err: %v\n", err)
c.mu.Unlock()
continue
}
Expand Down Expand Up @@ -268,7 +280,7 @@ func (c *Client) listen() {
for {
select {
case <-c.ctx.Done():
c.Stop()
c.closeWs(false)
klog.V(6).Infof("live.listen() Done\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
Expand Down Expand Up @@ -513,8 +525,10 @@ func (c *Client) Finalize() error {

// Stop will send close message and shutdown websocket connection
func (c *Client) Stop() {
klog.V(3).Infof("Stop Stopping...\n")
klog.V(3).Infof("Stopping...\n")
c.retry = false

// exit gracefully
c.ctxCancel()
c.closeWs(false)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/live/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ import (
const (
pingPeriod = 5 * time.Second

defaultConnectRetry int64 = 3
defaultDelayBetweenRetry int64 = 2
)

// external constants
const (
DefaultConnectRetry int64 = 3

ChunkSize = 1024 * 2
TerminationSleep = 100 * time.Millisecond
Expand Down
206 changes: 206 additions & 0 deletions tests/edge_cases/reconnect_client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved.
// Use of this source code is governed by a MIT license that can be found in the LICENSE file.
// SPDX-License-Identifier: MIT

package main

// streaming
import (
"context"
"fmt"
"os"
"strings"
"time"

api "github.com/deepgram/deepgram-go-sdk/pkg/api/live/v1/interfaces"
microphone "github.com/deepgram/deepgram-go-sdk/pkg/audio/microphone"
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces"
client "github.com/deepgram/deepgram-go-sdk/pkg/client/live"
)

// Implement your own callback
type MyCallback struct {
sb *strings.Builder
}

func (c MyCallback) Message(mr *api.MessageResponse) error {
// handle the message
sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript)

if len(mr.Channel.Alternatives) == 0 || sentence == "" {
return nil
}

if mr.IsFinal {
c.sb.WriteString(sentence)
c.sb.WriteString(" ")

if mr.SpeechFinal {
fmt.Printf("[------- Is Final]: %s\n", c.sb.String())
c.sb.Reset()
}
} else {
fmt.Printf("[Interim Result]: %s\n", sentence)
}

return nil
}

func (c MyCallback) Open(ocr *api.OpenResponse) error {
// handle the open
fmt.Printf("\n[Open] Received\n")
return nil
}

func (c MyCallback) Metadata(md *api.MetadataResponse) error {
// handle the metadata
fmt.Printf("\n[Metadata] Received\n")
fmt.Printf("Metadata.RequestID: %s\n", strings.TrimSpace(md.RequestID))
fmt.Printf("Metadata.Channels: %d\n", md.Channels)
fmt.Printf("Metadata.Created: %s\n\n", strings.TrimSpace(md.Created))
return nil
}

func (c MyCallback) SpeechStarted(ssr *api.SpeechStartedResponse) error {
fmt.Printf("\n[SpeechStarted] Received\n")
return nil
}

func (c MyCallback) UtteranceEnd(ur *api.UtteranceEndResponse) error {
utterance := strings.TrimSpace(c.sb.String())
if len(utterance) > 0 {
fmt.Printf("[------- UtteranceEnd]: %s\n", utterance)
c.sb.Reset()
} else {
fmt.Printf("\n[UtteranceEnd] Received\n")
}

return nil
}

func (c MyCallback) Close(ocr *api.CloseResponse) error {
// handle the close
fmt.Printf("\n[Close] Received\n")
return nil
}

func (c MyCallback) Error(er *api.ErrorResponse) error {
// handle the error
fmt.Printf("\n[Error] Received\n")
fmt.Printf("Error.Type: %s\n", er.Type)
fmt.Printf("Error.Message: %s\n", er.Message)
fmt.Printf("Error.Description: %s\n\n", er.Description)
return nil
}

func (c MyCallback) UnhandledEvent(byData []byte) error {
// handle the unhandled event
fmt.Printf("\n[UnhandledEvent] Received\n")
fmt.Printf("UnhandledEvent: %s\n\n", string(byData))
return nil
}

func main() {
// init library
microphone.Initialize()

/*
DG Streaming API
*/
// init library
client.Init(client.InitLib{
LogLevel: client.LogLevelTrace, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace
})

// Go context
ctx := context.Background()

// client options
cOptions := &interfaces.ClientOptions{
EnableKeepAlive: true,
}

// set the Transcription options
tOptions := &interfaces.LiveTranscriptionOptions{
Model: "nova-2",
Language: "en-US",
Punctuate: true,
Encoding: "linear16",
Channels: 1,
SampleRate: 16000,
SmartFormat: true,
VadEvents: true,
// To get UtteranceEnd, the following must be set:
InterimResults: true,
UtteranceEndMs: "1000",
// End of UtteranceEnd settings
}

// implement your own callback
callback := MyCallback{
sb: &strings.Builder{},
}

// create a Deepgram client
dgClient, err := client.New(ctx, "", cOptions, tOptions, callback)
if err != nil {
fmt.Println("ERROR creating LiveTranscription connection:", err)
return
}

for i := 0; i < 10; i++ {
if i > 0 {
time.Sleep(5 * time.Second)
}

// connect the websocket to Deepgram
wsconn := dgClient.AttemptReconnect(context.Background(), 3)
if wsconn == nil {
fmt.Println("Client.AttemptReconnect failed")
os.Exit(1)
}

/*
Microphone package
*/
// mic stuf
mic, err := microphone.New(microphone.AudioConfig{
InputChannels: 1,
SamplingRate: 16000,
})
if err != nil {
fmt.Printf("Initialize failed. Err: %v\n", err)
os.Exit(1)
}

// start the mic
err = mic.Start()
if err != nil {
fmt.Printf("mic.Start failed. Err: %v\n", err)
os.Exit(1)
}

go func() {
// feed the microphone stream to the Deepgram client (this is a blocking call)
err := mic.Stream(dgClient)
if err != nil {
fmt.Printf("mic.Stream non-fatal error. Err: %v\n", err)
}
}()

// sleep for 10 seconds
time.Sleep(10 * time.Second)

// close and repeat
err = mic.Stop()
if err != nil {
fmt.Printf("mic.Stop non-fatal error. Err: %v\n", err)
}
dgClient.Stop()
}

// teardown library
microphone.Teardown()

fmt.Printf("\n\nProgram exiting...\n")
}
Loading