Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KeepAlive Interval and Expose Error Callback #134

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/streaming/microphone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (c MyCallback) Metadata(md *api.MetadataResponse) error {
log.Printf("Metadata.Created: %s\n\n", strings.TrimSpace(md.Created))
return nil
}
func (c MyCallback) Error(er *api.ErrorResponse) error {
// handle the error
log.Printf("\nError.Type: %s\n", er.Type)
log.Printf("Error.Message: %s\n", er.Message)
log.Printf("Error.Description: %s\n\n", er.Description)
return nil
}

func main() {
// init library
Expand Down
41 changes: 39 additions & 2 deletions pkg/api/live/v1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ import (
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/live/v1/interfaces"
)

// DefaultCallbackHandler is a default callback handler for live transcription
// Simply prints the transcript to stdout
type DefaultCallbackHandler struct {
sb strings.Builder
}

// NewDefaultCallbackHandler creates a new DefaultCallbackHandler
func NewDefaultCallbackHandler() DefaultCallbackHandler {
return DefaultCallbackHandler{}
}

// Message is the callback for a message
func (dch DefaultCallbackHandler) Message(mr *interfaces.MessageResponse) error {
var debugStr string
if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" {
Expand Down Expand Up @@ -56,6 +64,7 @@ func (dch DefaultCallbackHandler) Message(mr *interfaces.MessageResponse) error
return nil
}

// Metadata is the callback for a metadata
func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) error {
var debugStr string
if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" {
Expand Down Expand Up @@ -88,6 +97,34 @@ func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) erro
return nil
}

func NewDefaultCallbackHandler() DefaultCallbackHandler {
return DefaultCallbackHandler{}
func (dch DefaultCallbackHandler) Error(er *interfaces.ErrorResponse) error {
var debugStr string
if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" {
klog.V(4).Infof("DEEPGRAM_DEBUG found")
debugStr = v
}

if strings.Compare(strings.ToLower(debugStr), "true") == 0 {
data, err := json.Marshal(er)
if err != nil {
klog.V(1).Infof("Error json.Marshal failed. Err: %v\n", err)
return err
}

prettyJson, err := prettyjson.Format(data)
if err != nil {
klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err)
return err
}
klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJson)

return nil
}

// handle the message
fmt.Printf("\nError.Type: %s\n", er.Type)
fmt.Printf("Error.Message: %s\n", er.Message)
fmt.Printf("Error.Description: %s\n\n", er.Description)

return nil
}
1 change: 1 addition & 0 deletions pkg/api/live/v1/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ package interfaces
type LiveMessageCallback interface {
Message(mr *MessageResponse) error
Metadata(md *MetadataResponse) error
Error(er *ErrorResponse) error
// TODO: implement other conversation insights
}
19 changes: 15 additions & 4 deletions pkg/api/live/v1/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,35 @@ package interfaces
/*
Shared defintions for the Deepgram API
*/
type Words struct {
// Word is a single word in a transcript
type Word struct {
Confidence float64 `json:"confidence,omitempty"`
End float64 `json:"end,omitempty"`
PunctuatedWord string `json:"punctuated_word,omitempty"`
Start float64 `json:"start,omitempty"`
Word string `json:"word,omitempty"`
}
type Alternatives struct {

// Alternative is a single alternative in a transcript
type Alternative struct {
Confidence float64 `json:"confidence,omitempty"`
Transcript string `json:"transcript,omitempty"`
Words []Words `json:"words,omitempty"`
Words []Word `json:"words,omitempty"`
}

// Channel is a single channel in a transcript
type Channel struct {
Alternatives []Alternatives `json:"alternatives,omitempty"`
Alternatives []Alternative `json:"alternatives,omitempty"`
}

// ModelInfo is the model information for a transcript
type ModelInfo struct {
Arch string `json:"arch,omitempty"`
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
}

// Metadata is the metadata for a transcript
type Metadata struct {
ModelInfo ModelInfo `json:"model_info,omitempty"`
ModelUUID string `json:"model_uuid,omitempty"`
Expand All @@ -37,6 +45,7 @@ type Metadata struct {
/*
Results from Live Transcription
*/
// MessageResponse is the response from a live transcription
type MessageResponse struct {
Channel Channel `json:"channel,omitempty"`
ChannelIndex []int `json:"channel_index,omitempty"`
Expand All @@ -48,6 +57,7 @@ type MessageResponse struct {
Type string `json:"type,omitempty"`
}

// MetadataResponse is the response from a live transcription
type MetadataResponse struct {
Channels int `json:"channels,omitempty"`
Created string `json:"created,omitempty"`
Expand All @@ -60,6 +70,7 @@ type MetadataResponse struct {
Type string `json:"type,omitempty"`
}

// ErrorResponse is the response from a live transcription
type ErrorResponse struct {
Description string `json:"description"`
Message string `json:"message"`
Expand Down
33 changes: 18 additions & 15 deletions pkg/api/live/v1/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package live

import (
"encoding/json"
"errors"

prettyjson "github.com/hokaccha/go-prettyjson"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -48,7 +47,7 @@ func (r *MessageRouter) Message(byMsg []byte) error {

switch mt.Type {
case interfaces.TypeErrorResponse:
return r.HandleError(byMsg)
return r.ErrorResponse(byMsg)
case interfaces.TypeMessageResponse:
return r.MessageResponse(byMsg)
case interfaces.TypeMetadataResponse:
Expand Down Expand Up @@ -121,31 +120,35 @@ func (r *MessageRouter) MetadataResponse(byMsg []byte) error {
return nil
}

// HandleError handles error messages
func (r *MessageRouter) HandleError(byMsg []byte) error {
klog.V(6).Infof("router.HandleError ENTER\n")
func (r *MessageRouter) ErrorResponse(byMsg []byte) error {
klog.V(6).Infof("router.ErrorResponse ENTER\n")

// trace debugging
r.printDebugMessages(1, "HandleError", byMsg)
r.printDebugMessages(5, "ErrorResponse", byMsg)

var er interfaces.ErrorResponse
err := json.Unmarshal(byMsg, &er)
if err != nil {
klog.V(1).Infof("HandleError json.Unmarshal failed. Err: %v\n", err)
klog.V(6).Infof("router.HandleError LEAVE\n")
klog.V(1).Infof("ErrorResponse json.Unmarshal failed. Err: %v\n", err)
klog.V(6).Infof("router.ErrorResponse LEAVE\n")
return err
}

b, err := json.MarshalIndent(er, "", " ")
if err != nil {
klog.V(1).Infof("HandleError MarshalIndent failed. Err: %v\n", err)
klog.V(6).Infof("router.HandleError LEAVE\n")
if r.callback != nil {
err := r.callback.Error(&er)
if err != nil {
klog.V(1).Infof("callback.ErrorResponse failed. Err: %v\n", err)
} else {
klog.V(5).Infof("callback.ErrorResponse succeeded\n")
}
klog.V(6).Infof("router.ErrorResponse LEAVE\n")
return err
}

klog.V(1).Infof("\n\nError: %s\n\n", string(b))
klog.V(6).Infof("router.HandleError LEAVE\n")
return errors.New(string(b))
klog.V(1).Infof("User callback is undefined\n")
klog.V(6).Infof("router.ErrorResponse ENTER\n")

return nil
}

// UnhandledMessage handles the UnhandledMessage message
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/manage/v1/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ type Request struct {
Callback interface{} `json:"callback,omitempty"`
}

// Models provides a list of models
type Models struct {
// Model provides a list of models
type Model struct {
Name string `json:"name,omitempty"`
Language string `json:"language,omitempty"`
Version string `json:"version,omitempty"`
Expand All @@ -143,8 +143,8 @@ type Resolution struct {
Amount int `json:"amount,omitempty"`
}

// Results provides a list of results
type Results struct {
// Result provides a list of results
type Result struct {
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
Hours float64 `json:"hours,omitempty"`
Expand All @@ -162,7 +162,7 @@ type RequestList struct {
// UsageField provides a usage field
type UsageField struct {
Tags []any `json:"tags,omitempty"`
Models []Models `json:"models,omitempty"`
Models []Model `json:"models,omitempty"`
ProcessingMethods []string `json:"processing_methods,omitempty"`
Features []string `json:"features,omitempty"`
}
Expand All @@ -172,7 +172,7 @@ type Usage struct {
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
Resolution Resolution `json:"resolution,omitempty"`
Results []Results `json:"results,omitempty"`
Results []Result `json:"results,omitempty"`
}

/***********************************/
Expand Down
37 changes: 25 additions & 12 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ func (c *Client) closeWs() {
func (c *Client) ping() {
klog.V(6).Infof("live.ping() ENTER\n")

var counter uint64
counter = 0

ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
Expand All @@ -424,7 +427,8 @@ func (c *Client) ping() {
klog.V(6).Infof("live.ping() LEAVE\n")
return
case <-ticker.C:
klog.V(4).Infof("Starting ping...")
klog.V(5).Infof("Starting ping...")
counter++

ws := c.Connect()
if ws == nil {
Expand All @@ -434,26 +438,35 @@ func (c *Client) ping() {

// doing a write, need to lock
c.mu.Lock()
klog.V(4).Infof("Sending ping... need reply in %d\n", (pingPeriod / 2))

// deepgram keepalive message
errDg := ws.WriteMessage(websocket.BinaryMessage, []byte("{ \"type\": \"KeepAlive\" }"))
if errDg != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
klog.V(5).Infof("Sending ping... need reply in %d\n", (pingPeriod / 2))

var errDg error
if c.cOptions.EnableKeepAlive {
klog.V(5).Infof("Sending Deepgram KeepAlive message...\n")
// deepgram keepalive message
errDg = ws.WriteMessage(websocket.BinaryMessage, []byte("{ \"type\": \"KeepAlive\" }"))
if errDg != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
}
}

// websocket protocol ping/pong
errProto := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2))
if errProto != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
// websocket protocol ping/pong... this loop is every 5 seconds, so ping every 20 seconds
var errProto error
errProto = nil
if counter%4 == 0 {
klog.V(5).Infof("Sending Protocol KeepAlive message...\n")
errProto = ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2))
if errProto != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
}
}
c.mu.Unlock()

if errDg != nil || errProto != nil {
klog.V(1).Infof("WebSocketClient::ping failed\n")
c.closeWs()
} else {
klog.V(2).Infof("Ping sent!")
klog.V(5).Infof("Ping sent!")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/live/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

const (
pingPeriod = 30 * time.Second
pingPeriod = 5 * time.Second

connectionRetryInfinite int64 = 0
defaultConnectRetry int64 = 3
Expand Down
1 change: 1 addition & 0 deletions pkg/client/live/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ClientOptions struct {
Path string // override for the endpoint path usually <version/listen>
RedirectService bool // allows HTTP redirects to be followed
SkipServerAuth bool // keeps the client from authenticating with the server
EnableKeepAlive bool // enables the keep alive feature
}

// Client is a struct representing the websocket client connection
Expand Down