Skip to content

Commit

Permalink
Merge pull request #228 from dvonthenen/fix-multi-write
Browse files Browse the repository at this point in the history
Fix Write From Multi-Source (KeepAlive, etc)
  • Loading branch information
dvonthenen authored May 29, 2024
2 parents 9a160a6 + ec21f9e commit 2479aa6
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 25 deletions.
1 change: 0 additions & 1 deletion pkg/client/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (c *Client) ProcessRequest(ctx context.Context, req *http.Request, result i
// HandleResponse processes the HTTP response for both streaming and URL-based API requests.
func (c *Client) HandleResponse(res *http.Response, keys []string, resBody interface{}) (map[string]string, error) {
klog.V(6).Infof("Handle HTTP response\n")
fmt.Printf("keys: %s\v", keys)
switch res.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent:
return decodeResponseBody(res, keys, resBody)
Expand Down
75 changes: 51 additions & 24 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,21 +352,26 @@ func (c *Client) Stream(r io.Reader) error {
return nil
default:
bytesRead, err := r.Read(chunk)
switch {
case err == nil:
// do nothing
case strings.Contains(err.Error(), FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
klog.V(6).Infof("live.listen() LEAVE\n")
return nil
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return nil
case err != nil:
klog.V(1).Infof("r.Read encountered EOF. Err: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
if err != nil {
errStr := err.Error()
switch {
case strings.Contains(errStr, SuccessfulSocketErr):
klog.V(3).Infof("Graceful websocket close\n")
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.Stream() LEAVE\n")
return nil
case err != nil:
klog.V(1).Infof("r.Read encountered EOF. Err: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
}
}

if bytesRead == 0 {
Expand All @@ -391,11 +396,17 @@ func (c *Client) WriteBinary(byData []byte) error {
// doing a write, need to lock
ws := c.Connect()
if ws == nil {
klog.V(1).Infof("WriteBinary Connection is not valid\n")
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
return ErrInvalidConnection

return err
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()

if err := ws.WriteMessage(
websocket.BinaryMessage,
byData,
Expand All @@ -422,9 +433,11 @@ func (c *Client) WriteJSON(payload interface{}) error {
// doing a write, need to lock
ws := c.Connect()
if ws == nil {
klog.V(1).Infof("WriteJSON Connection is not valid\n")
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteJSON() LEAVE\n")
return ErrInvalidConnection

return err
}

byData, err := json.Marshal(payload)
Expand All @@ -434,6 +447,10 @@ func (c *Client) WriteJSON(payload interface{}) error {
return err
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()

if err := ws.WriteMessage(
websocket.TextMessage,
byData,
Expand Down Expand Up @@ -472,15 +489,20 @@ func (c *Client) Write(p []byte) (int, error) {
func (c *Client) Finalize() error {
klog.V(7).Infof("live.Finalize() ENTER\n")

if c.wsconn == nil {
// doing a write, need to lock
ws := c.Connect()
if ws == nil {
err := ErrInvalidConnection

klog.V(4).Infof("Finalize Failed. Err: %v\n", err)
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.Finalize() LEAVE\n")

return err
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()

err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Finalize\" }"))

klog.V(4).Infof("Finalize Succeeded\n")
Expand Down Expand Up @@ -567,15 +589,20 @@ func (c *Client) ping() {
return
}

// doing a write, need to lock
c.mu.Lock()

// deepgram keepalive message
klog.V(5).Infof("Sending Deepgram KeepAlive message...\n")

err := c.WriteJSON(map[string]string{"type": "KeepAlive"})
err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"KeepAlive\" }"))
if err == nil {
klog.V(5).Infof("Ping sent!")
} else {
klog.V(1).Infof("Failed to send Deepgram KeepAlive. Err: %v\n", err)
}

// release
c.mu.Unlock()
}
}
}
Expand Down

0 comments on commit 2479aa6

Please sign in to comment.