Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port WebSocket Threading Fixes STT Live -> TTS Speak, Port Fixes From TTS Speak -> STT Live #246

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/prerecorded/file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
Paragraphs: true,
SmartFormat: true,
Language: "en-US",
Utterances: true,
}

// create a Deepgram client
Expand Down
21 changes: 1 addition & 20 deletions examples/speak/stream/interactive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
const (
API_KEY = ""
TTS_TEXT = "Hello, this is a text to speech example using Deepgram."
AUDIO_DIR = "tts_outputs"
AUDIO_FILE = "output.mp3"
)

Expand All @@ -31,26 +30,8 @@ func (c MyCallback) Metadata(md *msginterfaces.MetadataResponse) error {
return nil
}

func (c MyCallback) SpeakStream(ss *msginterfaces.SpeakStreamResponse) error {
fmt.Printf("\n[SpeakStream] Received\n")
fmt.Printf("SpeakStream.ContentType: %s\n", strings.TrimSpace(ss.ContentType))
fmt.Printf("SpeakStream.Date: %s\n", strings.TrimSpace(ss.Date))
fmt.Printf("SpeakStream.ModelName: %s\n", strings.TrimSpace(ss.ModelName))
fmt.Printf("SpeakStream.ModelUUID: %s\n", strings.TrimSpace(ss.ModelUUID))
fmt.Printf("SpeakStream.RequestID: %s\n", strings.TrimSpace(ss.RequestID))
return nil
}

func (c MyCallback) Binary(byMsg []byte) error {
if _, err := os.Stat(AUDIO_DIR); os.IsNotExist(err) {
err := os.Mkdir(AUDIO_DIR, 0755)
if err != nil {
fmt.Printf("Error creating directory %s: %v\n", AUDIO_DIR, err)
return err
}
}

filePath := fmt.Sprintf("%s/%s", AUDIO_DIR, AUDIO_FILE)
filePath := fmt.Sprintf("%s", AUDIO_FILE)
file, err := os.Create(filePath)
if err != nil {
fmt.Printf("Error creating file %s: %v\n", filePath, err)
Expand Down
2 changes: 1 addition & 1 deletion examples/streaming/microphone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c MyCallback) Error(er *api.ErrorResponse) error {
// handle the error
fmt.Printf("\n[Error] Received\n")
fmt.Printf("Error.Type: %s\n", er.Type)
fmt.Printf("Error.Message: %s\n", er.Message)
fmt.Printf("Error.ErrCode: %s\n", er.ErrCode)
fmt.Printf("Error.Description: %s\n\n", er.Description)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/live/v1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (dch DefaultCallbackHandler) Error(er *interfaces.ErrorResponse) error {

// handle the message
fmt.Printf("\n[ErrorResponse]\n")
fmt.Printf("\nError.Type: %s\n", er.Type)
fmt.Printf("Error.Message: %s\n", er.Message)
fmt.Printf("\nError.Type: %s\n", er.ErrCode)
fmt.Printf("Error.Message: %s\n", er.ErrMsg)
fmt.Printf("Error.Description: %s\n\n", er.Description)
fmt.Printf("Error.Variant: %s\n\n", er.Variant)

Expand Down
14 changes: 7 additions & 7 deletions pkg/api/live/v1/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces"
)

/***********************************/
// Request/Input structs
/***********************************/
type LiveOptions interfaces.LiveTranscriptionOptions

/***********************************/
// MessageType is the header to bootstrap you way unmarshalling other messages
/***********************************/
Expand Down Expand Up @@ -123,10 +128,5 @@ type CloseResponse struct {
Type string `json:"type,omitempty"`
}

// ErrorResponse is the response from a live transcription
type ErrorResponse struct {
Description string `json:"description"`
Message string `json:"message"`
Type string `json:"type"`
Variant string `json:"variant"`
}
// ErrorResponse is the Deepgram specific response error
type ErrorResponse interfaces.DeepgramError
9 changes: 0 additions & 9 deletions pkg/api/speak-stream/v1/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ type MessageType struct {
Type string `json:"type"`
}

// SpeakStreamResponse is the response from the text-to-speech request
type SpeakStreamResponse struct {
ContentType string `json:"content_type,omitempty"`
RequestID string `json:"request_id,omitempty"`
ModelUUID string `json:"model_uuid,omitempty"`
ModelName string `json:"model_name,omitempty"`
Date string `json:"date,omitempty"`
}

// MetadataResponse is the response from the text-to-speech request which contains metadata about the request
type MetadataResponse struct {
Type string `json:"type,omitempty"`
Expand Down
1 change: 0 additions & 1 deletion pkg/client/interfaces/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type DeepgramError struct {
ErrCode string `json:"err_code,omitempty"`
ErrMsg string `json:"err_msg,omitempty"`
Description string `json:"description,omitempty"`
RequestID string `json:"request_id,omitempty"`
Variant string `json:"variant,omitempty"`
}

Expand Down
29 changes: 17 additions & 12 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
if headers, ok := c.ctx.Value(interfaces.HeadersContext{}).(http.Header); ok {
for k, v := range headers {
for _, v := range v {
klog.V(3).Infof("Connect() RESTORE Header: %s = %s\n", k, v)
klog.V(3).Infof("internalConnectWithCancel RESTORE Header: %s = %s\n", k, v)
myHeader.Add(k, v)
}
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func (c *Client) listen() {
// release the lock
c.muConn.Unlock()

// read the message
// msgType can be binary or text
msgType, byMsg, err := ws.ReadMessage()

if err != nil {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (c *Client) listen() {
}

if len(byMsg) == 0 {
klog.V(7).Infof("listen: message empty")
klog.V(7).Infof("listen(): message empty")
continue
}

Expand All @@ -428,12 +428,17 @@ func (c *Client) listen() {

// callback!
if c.callback != nil {
err := c.router.Message(byMsg)
if err != nil {
klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err)
if msgType == websocket.TextMessage {
err := c.router.Message(byMsg)
if err != nil {
klog.V(1).Infof("live.listen(): router.Message failed. Err: %v\n", err)
}
} else {
// this shouldn't happen, but let's log it
klog.V(7).Infof("live.listen(): msg recv: type %d, len: %d\n", msgType, len(byMsg))
}
} else {
klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg))
klog.V(7).Infof("callback is nil: msg recv: type %d, len: %d\n", msgType, len(byMsg))
}
}
}
Expand Down Expand Up @@ -535,7 +540,7 @@ func (c *Client) WriteJSON(payload interface{}) error {

byData, err := json.Marshal(payload)
if err != nil {
klog.V(1).Infof("WriteJSON json.Marshal failed. Err: %v\n", err)
klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err)
klog.V(7).Infof("live.WriteJSON() LEAVE\n")
return err
}
Expand Down Expand Up @@ -592,9 +597,9 @@ func (c *Client) Write(p []byte) (int, error) {
func (c *Client) KeepAlive() error {
klog.V(7).Infof("live.KeepAlive() ENTER\n")

err := c.WriteJSON(controlMessage{Type: "KeepAlive"})
err := c.WriteJSON(controlMessage{Type: MessageTypeKeepAlive})
if err != nil {
klog.V(1).Infof("Finalize failed. Err: %v\n", err)
klog.V(1).Infof("KeepAlive failed. Err: %v\n", err)
klog.V(7).Infof("live.KeepAlive() LEAVE\n")

return err
Expand All @@ -609,7 +614,7 @@ func (c *Client) KeepAlive() error {
func (c *Client) Finalize() error {
klog.V(7).Infof("live.KeepAlive() ENTER\n")

err := c.WriteJSON(controlMessage{Type: "Finalize"})
err := c.WriteJSON(controlMessage{Type: MessageTypeFinalize})
if err != nil {
klog.V(1).Infof("Finalize failed. Err: %v\n", err)
klog.V(7).Infof("live.Finalize() LEAVE\n")
Expand Down Expand Up @@ -842,7 +847,7 @@ func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse {

response := &msginterfaces.ErrorResponse{
Type: msginterfaces.TypeErrorResponse,
Message: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)),
ErrMsg: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)),
Description: strings.TrimSpace(errorDesc),
Variant: errorNum,
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/client/live/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package live
import (
"errors"
"time"
// gabs "github.com/Jeffail/gabs/v2"
)

// internal constants for retry, waits, back-off, etc.
Expand Down Expand Up @@ -37,15 +36,18 @@ const (
)

const (
// MessageTypeKeepAlive keep the connection alive
MessageTypeKeepAlive string = "KeepAlive"

// MessageTypeFinalize flushes the transcription from the server
MessageTypeFinalize string = "Finalize"

// MessageTypeCloseStream closes the stream
MessageTypeCloseStream string = "CloseStream"
)

// errors
var (
// ErrInvalidInput required input was not found
ErrInvalidInput = errors.New("required input was not found")

// ErrInvalidConnection connection is not valid
ErrInvalidConnection = errors.New("connection is not valid")
)
Loading
Loading