Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Repeated/Excessive Read Failure on WS #234

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/streaming/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func main() {
fmt.Printf("Stream is up and running %s\n", reflect.TypeOf(res))

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/streaming/microphone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/streaming/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
log.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/streaming/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func main() {
fmt.Print("\n\nPress ENTER to exit!\n\n")

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
71 changes: 44 additions & 27 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,45 @@ func NewWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey str
}

// Connect performs a websocket connection with "DefaultConnectRetry" number of retries.
func (c *Client) Connect() *websocket.Conn {
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(DefaultConnectRetry))
func (c *Client) Connect() bool {
// set the retry count
if c.retryCnt == 0 {
c.retryCnt = DefaultConnectRetry
}
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) != nil
}

// ConnectWithCancel performs a websocket connection with specified number of retries and providing a
// cancel function to stop the connection
func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool {
return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt) != nil
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) *websocket.Conn {
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool {
c.retry = true
c.ctx, c.ctxCancel = context.WithCancel(ctx)
return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(retries))
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries)) != nil
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) *websocket.Conn {
// AttemptReconnect performs a reconnect after failing retries and providing a cancel function
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool {
c.retry = true
return c.ConnectWithRetry(ctx, ctxCancel, int(retries))
return c.internalConnectWithCancel(ctx, ctxCancel, int(retries)) != nil
}

func (c *Client) internalConnect() *websocket.Conn {
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt))
}

// ConnectWithRetry allows for connecting with specified retry attempts
//
//nolint:funlen // this is a complex function. keep as is
func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
klog.V(7).Infof("live.Connect() ENTER\n")

// set the context
c.ctx = ctx
c.ctxCancel = ctxCancel
c.retryCnt = int64(retryCnt)

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
Expand All @@ -149,13 +162,6 @@ func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelF
return nil
}

// set the retry count
if retryCnt <= 0 {
c.retryCnt = DefaultConnectRetry
} else {
c.retryCnt = int64(retryCnt)
}

// if the connection is good, return it otherwise, attempt reconnect
if c.wsconn != nil {
select {
Expand Down Expand Up @@ -285,7 +291,7 @@ func (c *Client) listen() {
klog.V(6).Infof("live.listen() LEAVE\n")
return
case <-ticker.C:
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
klog.V(3).Infof("listen: Connection is not valid\n")
klog.V(6).Infof("live.listen() LEAVE\n")
Expand Down Expand Up @@ -348,8 +354,19 @@ func (c *Client) listen() {
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr)
}

// close the connection
c.closeWs(true)

klog.V(6).Infof("live.listen() LEAVE\n")
return
}
continue
}

if len(byMsg) == 0 {
Expand Down Expand Up @@ -393,13 +410,13 @@ func (c *Client) Stream(r io.Reader) error {
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
return err
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case err != nil:
klog.V(1).Infof("r.Read encountered EOF. Err: %v\n", err)
return err
default:
klog.V(1).Infof("r.Read error. Err: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
}
Expand All @@ -426,7 +443,7 @@ func (c *Client) WriteBinary(byData []byte) error {
klog.V(7).Infof("live.WriteBinary() ENTER\n")

// doing a write, need to lock
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
Expand Down Expand Up @@ -463,7 +480,7 @@ func (c *Client) WriteJSON(payload interface{}) error {
klog.V(7).Infof("live.WriteJSON() ENTER\n")

// doing a write, need to lock
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
Expand Down Expand Up @@ -522,7 +539,7 @@ func (c *Client) Finalize() error {
klog.V(7).Infof("live.Finalize() ENTER\n")

// doing a write, need to lock
ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
Expand Down Expand Up @@ -620,7 +637,7 @@ func (c *Client) ping() {
klog.V(5).Infof("Starting ping...")
counter++

ws := c.Connect()
ws := c.internalConnect()
if ws == nil {
klog.V(1).Infof("ping Connection is not valid\n")
klog.V(6).Infof("live.ping() LEAVE\n")
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/cancel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/failed_retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/keepalive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/reconnect_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func main() {
}

// connect the websocket to Deepgram
wsconn := dgClient.AttemptReconnect(context.Background(), 3)
if wsconn == nil {
bConnected := dgClient.AttemptReconnect(context.Background(), 3)
if !bConnected {
fmt.Println("Client.AttemptReconnect failed")
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/edge_cases/timeout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func main() {
fmt.Printf("\n\nYou will first see an OpenResponse message followed by CloseResponse in 12 seconds.\n")

// connect the websocket to Deepgram
wsconn := dgClient.Connect()
if wsconn == nil {
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
os.Exit(1)
}
Expand Down
Loading