Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
dvonthenen committed Aug 7, 2024
1 parent 03266a1 commit 033563d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 165 deletions.
51 changes: 6 additions & 45 deletions deepgram/clients/listen/v1/websocket/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,25 +398,12 @@ async def _listening(self) -> None:
self._logger.debug("AsyncListenWebSocketClient._listening LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in AsyncListenWebSocketClient._listening with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in AsyncListenWebSocketClient._listening",
f"{e}",
"ConnectionClosed",
)
await self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
error=cc_error,
**dict(cast(Dict[Any, Any], self._kwargs)),
)

# signal exit and close
await self._signal_exit()

self._logger.debug("AsyncListenWebSocketClient._listening LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -511,25 +498,12 @@ async def _keep_alive(self) -> None:
self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in AsyncListenWebSocketClient._keep_alive with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in AsyncListenWebSocketClient._keep_alive",
f"{e}",
"ConnectionClosed",
)
await self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
error=cc_error,
**dict(cast(Dict[Any, Any], self._kwargs)),
)

# signal exit and close
await self._signal_exit()

self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -638,25 +612,12 @@ async def _flush(self) -> None:
self._logger.debug("AsyncListenWebSocketClient._flush LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in AsyncListenWebSocketClient._flush with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in AsyncListenWebSocketClient._flush",
f"{e}",
"ConnectionClosed",
)
await self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
error=cc_error,
**dict(cast(Dict[Any, Any], self._kwargs)),
)

# signal exit and close
await self._signal_exit()

self._logger.debug("AsyncListenWebSocketClient._flush LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down
49 changes: 8 additions & 41 deletions deepgram/clients/listen/v1/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,23 +383,12 @@ def _listening(
self._logger.debug("ListenWebSocketClient._listening LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in ListenWebSocketClient._listening with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in ListenWebSocketClient._listening",
f"{e}",
"ConnectionClosed",
)
self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error
)

# signal exit and close
self._signal_exit()

self._logger.debug("ListenWebSocketClient._listening LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -490,23 +479,12 @@ def _keep_alive(self) -> None:
self._logger.debug("ListenWebSocketClient._keep_alive LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in ListenWebSocketClient._keep_alive with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in ListenWebSocketClient._keep_alive",
f"{e}",
"ConnectionClosed",
)
self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error
)

# signal exit and close
self._signal_exit()

self._logger.debug("ListenWebSocketClient._keep_alive LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -616,23 +594,12 @@ def _flush(self) -> None:
self._logger.debug("ListenWebSocketClient._flush LEAVE")
return

self._logger.error(
"ConnectionClosed in ListenWebSocketClient._flush with code %s: %s",
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in ListenWebSocketClient._listening with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in ListenWebSocketClient._flush",
f"{e}",
"ConnectionClosed",
)
self._emit(
LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error
)

# signal exit and close
self._signal_exit()

self._logger.debug("ListenWebSocketClient._flush LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -857,7 +824,7 @@ def _signal_exit(self) -> None:
except websockets.exceptions.ConnectionClosedOK as e:
self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code)
except websockets.exceptions.ConnectionClosed as e:
self._logger.error("_signal_exit - ConnectionClosed: %s", e.code)
self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code)
except websockets.exceptions.WebSocketException as e:
self._logger.error("_signal_exit - WebSocketException: %s", str(e))
except Exception as e: # pylint: disable=broad-except
Expand Down
54 changes: 17 additions & 37 deletions deepgram/clients/speak/v1/websocket/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def start(
self._logger.debug("AsyncSpeakStreamClient.start LEAVE")
return True
except websockets.ConnectionClosed as e:
self._logger.error(
self._logger.notice(
"ConnectionClosed in AsyncSpeakStreamClient.start: %s", e
)
self._logger.debug("AsyncSpeakStreamClient.start LEAVE")
Expand Down Expand Up @@ -392,25 +392,12 @@ async def _listening(self) -> None:
self._logger.debug("AsyncSpeakStreamClient._listening LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in AsyncSpeakStreamClient._listening",
f"{e}",
"ConnectionClosed",
)
await self._emit(
SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
error=cc_error,
**dict(cast(Dict[Any, Any], self._kwargs)),
)

# signal exit and close
await self._signal_exit()

self._logger.debug("AsyncSpeakStreamClient._listening LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -516,25 +503,12 @@ async def _flush(self) -> None:
self._logger.debug("AsyncSpeakStreamClient._flush LEAVE")
return

self._logger.error(
"ConnectionClosed in AsyncSpeakStreamClient._flush with code %s: %s",
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in AsyncSpeakStreamClient._flush",
f"{e}",
"ConnectionClosed",
)
await self._emit(
SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
error=cc_error,
**dict(cast(Dict[Any, Any], self._kwargs)),
)

# signal exit and close
await self._signal_exit()

self._logger.debug("AsyncSpeakStreamClient._flush LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -621,10 +595,15 @@ async def send_raw(self, msg: str) -> bool:
self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE")
return False

if isinstance(msg, dict) and "type" in msg and msg["type"] == "Flush":
self._last_datagram = None
self._flush_count += 1
self._logger.debug("Increment Flush count: %d", self._flush_count)
try:
myJson = json.loads(msg)
if "type" in myJson and myJson["type"] == "Flush":
with self._lock_flush:
self._last_datagram = None
self._flush_count += 1
self._logger.debug("Increment Flush count: %d", self._flush_count)
except Exception as e:
self._logger.error("send_raw() failed - Exception: %s", str(e))

if self._socket is not None:
try:
Expand Down Expand Up @@ -762,7 +741,8 @@ async def _signal_exit(self) -> None:
self._logger.verbose("send CloseStream...")
try:
# if the socket connection is closed, the following line might throw an error
await self.send_raw(json.dumps({"type": "CloseStream"}))
# need to explicitly use _socket.send (dont use self.send_raw)
await self._socket.send(json.dumps({"type": "CloseStream"}))
except websockets.exceptions.ConnectionClosedOK as e:
self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code)
except websockets.exceptions.ConnectionClosed as e:
Expand Down
59 changes: 18 additions & 41 deletions deepgram/clients/speak/v1/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,21 +372,12 @@ def _listening(
self._logger.debug("SpeakStreamClient._listening LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in SpeakStreamClient._listening with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in SpeakStreamClient._listening",
f"{e}",
"ConnectionClosed",
)
self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), cc_error)

# signal exit and close
self._signal_exit()

self._logger.debug("SpeakStreamClient._listening LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -451,7 +442,7 @@ def _flush(self) -> None:
while True:
try:
counter += 1
self._exit_event.wait(timeout=ONE_SECOND)
self._exit_event.wait(timeout=HALF_SECOND)

if self._exit_event.is_set():
self._logger.notice("_flush exiting gracefully")
Expand Down Expand Up @@ -488,25 +479,12 @@ def _flush(self) -> None:
self._logger.debug("SpeakStreamClient._flush LEAVE")
return

self._logger.error(
# no need to call self._signal_exit() here because we are already closed
self._logger.notice(
"ConnectionClosed in SpeakStreamClient._flush with code %s: %s",
e.code,
e.reason,
)
cc_error: ErrorResponse = ErrorResponse(
"ConnectionClosed in SpeakStreamClient._flush",
f"{e}",
"ConnectionClosed",
)
self._emit(
SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
error=cc_error,
**dict(cast(Dict[Any, Any], self._kwargs)),
)

# signal exit and close
self._signal_exit()

self._logger.debug("SpeakStreamClient._flush LEAVE")

if self._config.options.get("termination_exception") == "true":
Expand Down Expand Up @@ -591,18 +569,16 @@ def send_raw(self, msg: str) -> bool:
self._logger.notice("send exiting gracefully")
self._logger.debug("SpeakStreamClient.send LEAVE")
return False

if (
isinstance(msg, dict)
and "type" in msg
and msg["type"] == "Flush"
):
with self._lock_flush:
self._last_datagram = None
self._flush_count += 1
self._logger.debug(
"Increment Flush count: %d", self._flush_count
)

try:
myJson = json.loads(msg)
if "type" in myJson and myJson["type"] == "Flush":
with self._lock_flush:
self._last_datagram = None
self._flush_count += 1
self._logger.debug("Increment Flush count: %d", self._flush_count)
except Exception as e:
self._logger.error("send_raw() failed - Exception: %s", str(e))

if self._socket is not None:
with self._lock_send:
Expand Down Expand Up @@ -729,11 +705,12 @@ def _signal_exit(self) -> None:
self._logger.notice("sending Close...")
try:
# if the socket connection is closed, the following line might throw an error
self._socket.send_raw(json.dumps({"type": "Close"}))
# need to explicitly use _socket.send (dont use self.send_raw)
self._socket.send(json.dumps({"type": "CloseStream"}))
except websockets.exceptions.ConnectionClosedOK as e:
self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code)
except websockets.exceptions.ConnectionClosed as e:
self._logger.error("_signal_exit - ConnectionClosed: %s", e.code)
self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code)
except websockets.exceptions.WebSocketException as e:
self._logger.error("_signal_exit - WebSocketException: %s", str(e))
except Exception as e: # pylint: disable=broad-except
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def main():
# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
config: DeepgramClientOptions = DeepgramClientOptions(
url="api.beta.deepgram.com",
options={"auto_flush_speak_delta": "500"},
options={"auto_flush_speak_delta": "1000"},
verbose=verboselogs.SPAM,
)
deepgram: DeepgramClient = DeepgramClient("", config)
Expand Down

0 comments on commit 033563d

Please sign in to comment.