Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Using ws:// With Self Hosted #269

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/api/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options

// check if the host has a protocol
r := regexp.MustCompile(`^(https?)://(.+)$`)
if apiType == APITypeLive {
if apiType == APITypeLive || apiType == APITypeSpeakStream {
r = regexp.MustCompile(`^(wss?)://(.+)$`)
}

Expand Down Expand Up @@ -98,6 +98,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options
// construct the full path and substitute the version and all query parameters
fullpath := fmt.Sprintf("%%s/%s", path)
completeFullpath := fmt.Sprintf(fullpath, append([]interface{}{version}, args...)...)
klog.V(3).Infof("completeFullpath: %s\n", completeFullpath)

// construct the URL
var u url.URL
Expand All @@ -106,6 +107,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options
} else {
u = url.URL{Scheme: protocol, Host: host, Path: completeFullpath}
}
klog.V(3).Infof("URI final: %s\n", u.String())

return u.String(), nil
}
128 changes: 82 additions & 46 deletions pkg/client/common/v1/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (c *WSClient) internalConnect() *websocket.Conn {
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false)
}

//nolint:funlen // this is a complex function. keep as is
//nolint:funlen,gocyclo // this is a complex function. keep as is
func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn {
klog.V(7).Infof("live.internalConnectWithCancel() ENTER\n")
klog.V(7).Infof("common.internalConnectWithCancel() ENTER\n")

// set the context
c.ctx = ctx
Expand All @@ -94,13 +94,16 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
if lock {
klog.V(3).Infof("Locking connection mutex\n")
c.muConn.Lock()
defer c.muConn.Unlock()
}

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil
}

Expand All @@ -109,32 +112,36 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
select {
case <-c.ctx.Done():
klog.V(1).Infof("Connection is not valid\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil
default:
klog.V(7).Infof("Connection is good. Return object.")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return c.wsconn
}
} else {
select {
case <-c.ctx.Done():
klog.V(1).Infof("Context is not valid. Has been canceled.\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil
default:
klog.V(3).Infof("Context is still valid. Retry...\n")
}
}

dialer := websocket.Dialer{
HandshakeTimeout: 45 * time.Second,
/* #nosec G402 */
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth},
RedirectService: c.cOptions.RedirectService,
SkipServerAuth: c.cOptions.SkipServerAuth,
}

// set websocket headers
myHeader := http.Header{}

Expand Down Expand Up @@ -175,10 +182,30 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
if err != nil {
klog.V(1).Infof("GetURL failed. Err: %v\n", err)
klog.V(7).Infof("internalConnectWithCancel() LEAVE\n")
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}
return nil // no point in retrying because this is going to fail on every retry
}
klog.V(5).Infof("Connecting to %s\n", url)

// if host starts with "ws://", then disable TLS
var dialer websocket.Dialer
if url[:5] == "ws://" {
dialer = websocket.Dialer{
HandshakeTimeout: 15 * time.Second,
RedirectService: c.cOptions.RedirectService,
}
} else {
dialer = websocket.Dialer{
HandshakeTimeout: 15 * time.Second,
/* #nosec G402 */
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth},
RedirectService: c.cOptions.RedirectService,
SkipServerAuth: c.cOptions.SkipServerAuth,
}
}
// perform the websocket connection
ws, res, err := dialer.DialContext(c.ctx, url, myHeader)
if res != nil {
Expand All @@ -197,6 +224,10 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont

// kick off threads to listen for messages and ping/keepalive
go c.listen()
if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}

// start WS specific items
(*c.processMessages).Start()
Expand All @@ -210,21 +241,26 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
}

klog.V(3).Infof("WebSocket Connection Successful!")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")

return c.wsconn
}

// if we get here, we failed to connect
klog.V(1).Infof("Failed to connect to websocket: %s\n", c.cOptions.Host)
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")

if lock {
klog.V(3).Infof("Unlocking connection mutex\n")
c.muConn.Unlock()
}

return nil
}

//nolint:funlen // this is a complex function. keep as is
func (c *WSClient) listen() {
klog.V(6).Infof("live.listen() ENTER\n")
klog.V(6).Infof("common.listen() ENTER\n")

defer func() {
if r := recover(); r != nil {
Expand All @@ -240,7 +276,7 @@ func (c *WSClient) listen() {
// fatal close
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
}
}()
Expand All @@ -256,7 +292,7 @@ func (c *WSClient) listen() {
c.muConn.Unlock()

klog.V(3).Infof("listen: Connection is not valid\n")
klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
}

Expand All @@ -275,15 +311,15 @@ func (c *WSClient) listen() {
// graceful close
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case strings.Contains(errStr, UseOfClosedSocket):
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)

// fatal close
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)
Expand All @@ -297,7 +333,7 @@ func (c *WSClient) listen() {
// fatal close
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case strings.Contains(errStr, "Deepgram"):
klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err)
Expand All @@ -311,7 +347,7 @@ func (c *WSClient) listen() {
// close the connection
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
case (err == io.EOF || err == io.ErrUnexpectedEOF):
klog.V(3).Infof("stream object EOF\n")
Expand All @@ -325,7 +361,7 @@ func (c *WSClient) listen() {
// close the connection
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)
Expand All @@ -339,7 +375,7 @@ func (c *WSClient) listen() {
// close the connection
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
klog.V(6).Infof("common.listen() LEAVE\n")
return
}
}
Expand All @@ -359,7 +395,7 @@ func (c *WSClient) listen() {

// WriteBinary writes binary data to the websocket server
func (c *WSClient) WriteBinary(byData []byte) error {
klog.V(7).Infof("live.WriteBinary() ENTER\n")
klog.V(7).Infof("common.WriteBinary() ENTER\n")

// doing a write, need to lock
c.muConn.Lock()
Expand All @@ -370,7 +406,7 @@ func (c *WSClient) WriteBinary(byData []byte) error {
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
klog.V(7).Infof("common.WriteBinary() LEAVE\n")

return err
}
Expand All @@ -380,28 +416,28 @@ func (c *WSClient) WriteBinary(byData []byte) error {
byData,
); err != nil {
klog.V(1).Infof("WriteBinary WriteMessage failed. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
klog.V(7).Infof("common.WriteBinary() LEAVE\n")
return err
}

klog.V(7).Infof("WriteBinary Successful\n")
klog.V(7).Infof("payload: %x\n", byData)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
klog.V(7).Infof("common.WriteBinary() LEAVE\n")

return nil
}

/*
WriteJSON writes a JSON control payload to the websocket server. These are control messages for
managing the live transcription session on the Deepgram server.
managing the websocket connection.
*/
func (c *WSClient) WriteJSON(payload interface{}) error {
klog.V(6).Infof("live.WriteJSON() ENTER\n")
klog.V(6).Infof("common.WriteJSON() ENTER\n")

byData, err := json.Marshal(payload)
if err != nil {
klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err)
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
return err
}

Expand All @@ -414,7 +450,7 @@ func (c *WSClient) WriteJSON(payload interface{}) error {
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")

return err
}
Expand All @@ -424,20 +460,20 @@ func (c *WSClient) WriteJSON(payload interface{}) error {
byData,
); err != nil {
klog.V(1).Infof("WriteJSON WriteMessage failed. Err: %v\n", err)
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
return err
}

klog.V(4).Infof("live.WriteJSON() Succeeded\n")
klog.V(4).Infof("common.WriteJSON() Succeeded\n")
klog.V(6).Infof("payload: %s\n", string(byData))
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
klog.V(6).Infof("common.WriteJSON() LEAVE\n")

return nil
}

// closeStream sends an application level message to Deepgram
func (c *WSClient) closeStream(lock bool) error {
klog.V(7).Infof("live.closeStream() ENTER\n")
klog.V(7).Infof("common.closeStream() ENTER\n")

// doing a write, need to lock
if lock {
Expand All @@ -456,20 +492,20 @@ func (c *WSClient) closeStream(lock bool) error {

if err != nil {
klog.V(1).Infof("WriteMessage failed. Err: %v\n", err)
klog.V(7).Infof("live.closeStream() LEAVE\n")
klog.V(7).Infof("common.closeStream() LEAVE\n")

return err
}

klog.V(4).Infof("closeStream Succeeded\n")
klog.V(7).Infof("live.closeStream() LEAVE\n")
klog.V(7).Infof("common.closeStream() LEAVE\n")

return err
}

// normalClosure sends a normal closure message to the server
func (c *WSClient) normalClosure(lock bool) error {
klog.V(7).Infof("live.normalClosure() ENTER\n")
klog.V(7).Infof("common.normalClosure() ENTER\n")

// doing a write, need to lock
if lock {
Expand All @@ -481,7 +517,7 @@ func (c *WSClient) normalClosure(lock bool) error {
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.normalClosure() LEAVE\n")
klog.V(7).Infof("common.normalClosure() LEAVE\n")

return err
}
Expand All @@ -496,7 +532,7 @@ func (c *WSClient) normalClosure(lock bool) error {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err)
}

klog.V(7).Infof("live.normalClosure() LEAVE\n")
klog.V(7).Infof("common.normalClosure() LEAVE\n")

return err
}
Expand All @@ -514,7 +550,7 @@ func (c *WSClient) Stop() {

// closeWs closes the websocket connection
func (c *WSClient) closeWs(fatal bool, perm bool) {
klog.V(6).Infof("live.closeWs() closing channels...\n")
klog.V(6).Infof("common.closeWs() closing channels...\n")

// doing a write, need to lock
c.muConn.Lock()
Expand Down Expand Up @@ -555,8 +591,8 @@ func (c *WSClient) closeWs(fatal bool, perm bool) {
c.wsconn = nil
}

klog.V(4).Infof("live.closeWs() Succeeded\n")
klog.V(6).Infof("live.closeWs() LEAVE\n")
klog.V(4).Infof("common.closeWs() Succeeded\n")
klog.V(6).Infof("common.closeWs() LEAVE\n")
}

// sendError sends an error message to the callback handler
Expand Down
Loading
Loading