Skip to content

Commit

Permalink
Fix Repeated/Excessive Read Failure on WS
Browse files Browse the repository at this point in the history
  • Loading branch information
dvonthenen committed Jun 5, 2024
1 parent 8fdd098 commit e92ce9c
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 45 deletions.
4 changes: 2 additions & 2 deletions examples/streaming/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func main() {
fmt.Printf("Stream is up and running %s\n", reflect.TypeOf(res))

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/streaming/microphone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/streaming/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
log.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/streaming/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func main() {
fmt.Print("\n\nPress ENTER to exit!\n\n")

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
71 changes: 44 additions & 27 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,45 @@ func NewWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey str
}

// Connect performs a websocket connection with "DefaultConnectRetry" number of retries.
func (c *Client) Connect() *websocket.Conn {
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(DefaultConnectRetry))
func (c *Client) Connect() bool {
// set the retry count
if c.retryCnt == 0 {
c.retryCnt = DefaultConnectRetry
}
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) != nil
}

// ConnectWithCancel performs a websocket connection with specified number of retries and providing a
// cancel function to stop the connection
func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool {
return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt) != nil
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) *websocket.Conn {
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool {
c.retry = true
c.ctx, c.ctxCancel = context.WithCancel(ctx)
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(retries))
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries)) != nil
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) *websocket.Conn {
// AttemptReconnect performs a reconnect after failing retries and providing a cancel function
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool {
c.retry = true
return c.ConnectWithRetry(ctx, ctxCancel, int(retries))
return c.internalConnectWithCancel(ctx, ctxCancel, int(retries)) != nil
}

func (c *Client) internalConnect() *websocket.Conn {
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt))
}

// ConnectWithRetry allows for connecting with specified retry attempts
//
//nolint:funlen // this is a complex function. keep as is
func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
klog.V(7).Infof("live.Connect() ENTER\n")

// set the context
c.ctx = ctx
c.ctxCancel = ctxCancel
c.retryCnt = int64(retryCnt)

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
Expand All @@ -149,13 +162,6 @@ func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelF
return nil
}

// set the retry count
if retryCnt <= 0 {
c.retryCnt = DefaultConnectRetry
} else {
c.retryCnt = int64(retryCnt)
}

// if the connection is good, return it otherwise, attempt reconnect
if c.wsconn != nil {
select {
Expand Down Expand Up @@ -285,7 +291,7 @@ func (c *Client) listen() {
klog.V(6).Infof("live.listen() LEAVE\n")
return
case <-ticker.C:
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
klog.V(3).Infof("listen: Connection is not valid\n")
klog.V(6).Infof("live.listen() LEAVE\n")
Expand Down Expand Up @@ -348,8 +354,19 @@ func (c *Client) listen() {
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr)
}

// close the connection
c.closeWs(true)

klog.V(6).Infof("live.listen() LEAVE\n")
return
}
continue
}

if len(byMsg) == 0 {
Expand Down Expand Up @@ -393,13 +410,13 @@ func (c *Client) Stream(r io.Reader) error {
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
return err
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case err != nil:
klog.V(1).Infof("r.Read encountered EOF. Err: %v\n", err)
return err
default:
klog.V(1).Infof("r.Read error. Err: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
}
Expand All @@ -426,7 +443,7 @@ func (c *Client) WriteBinary(byData []byte) error {
klog.V(7).Infof("live.WriteBinary() ENTER\n")

// doing a write, need to lock
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
Expand Down Expand Up @@ -463,7 +480,7 @@ func (c *Client) WriteJSON(payload interface{}) error {
klog.V(7).Infof("live.WriteJSON() ENTER\n")

// doing a write, need to lock
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
Expand Down Expand Up @@ -522,7 +539,7 @@ func (c *Client) Finalize() error {
klog.V(7).Infof("live.Finalize() ENTER\n")

// doing a write, need to lock
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
Expand Down Expand Up @@ -620,7 +637,7 @@ func (c *Client) ping() {
klog.V(5).Infof("Starting ping...")
counter++

ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
klog.V(1).Infof("ping Connection is not valid\n")
klog.V(6).Infof("live.ping() LEAVE\n")
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/cancel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/failed_retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/keepalive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/reconnect_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.AttemptReconnect(context.Background(), 3)
if wsconn == nil {
bConnected := dgClient.AttemptReconnect(context.Background(), 3)
if !bConnected {
fmt.Println("Client.AttemptReconnect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/timeout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func main() {
fmt.Printf("\n\nYou will first see an OpenResponse message followed by CloseResponse in 12 seconds.\n")

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down

0 comments on commit e92ce9c

Please sign in to comment.