From 033563d46fe745ecaaa5c86936a2dd78c9e3bc9d Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Wed, 7 Aug 2024 11:20:42 -0700 Subject: [PATCH] update --- .../listen/v1/websocket/async_client.py | 51 ++-------------- .../clients/listen/v1/websocket/client.py | 49 +++------------ .../speak/v1/websocket/async_client.py | 54 ++++++----------- deepgram/clients/speak/v1/websocket/client.py | 59 ++++++------------- .../simple/main-non-containerized.py | 2 +- 5 files changed, 50 insertions(+), 165 deletions(-) diff --git a/deepgram/clients/listen/v1/websocket/async_client.py b/deepgram/clients/listen/v1/websocket/async_client.py index 82afddb0..0b7cb3c2 100644 --- a/deepgram/clients/listen/v1/websocket/async_client.py +++ b/deepgram/clients/listen/v1/websocket/async_client.py @@ -398,25 +398,12 @@ async def _listening(self) -> None: self._logger.debug("AsyncListenWebSocketClient._listening LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in AsyncListenWebSocketClient._listening with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in AsyncListenWebSocketClient._listening", - f"{e}", - "ConnectionClosed", - ) - await self._emit( - LiveTranscriptionEvents(LiveTranscriptionEvents.Error), - error=cc_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - self._logger.debug("AsyncListenWebSocketClient._listening LEAVE") if self._config.options.get("termination_exception") == "true": @@ -511,25 +498,12 @@ async def _keep_alive(self) -> None: self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in AsyncListenWebSocketClient._keep_alive with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in AsyncListenWebSocketClient._keep_alive", - f"{e}", - "ConnectionClosed", - ) - await self._emit( - LiveTranscriptionEvents(LiveTranscriptionEvents.Error), - error=cc_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE") if self._config.options.get("termination_exception") == "true": @@ -638,25 +612,12 @@ async def _flush(self) -> None: self._logger.debug("AsyncListenWebSocketClient._flush LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in AsyncListenWebSocketClient._flush with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in AsyncListenWebSocketClient._flush", - f"{e}", - "ConnectionClosed", - ) - await self._emit( - LiveTranscriptionEvents(LiveTranscriptionEvents.Error), - error=cc_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - self._logger.debug("AsyncListenWebSocketClient._flush LEAVE") if self._config.options.get("termination_exception") == "true": diff --git a/deepgram/clients/listen/v1/websocket/client.py b/deepgram/clients/listen/v1/websocket/client.py index d8297ecd..1e6bb2cd 100644 --- a/deepgram/clients/listen/v1/websocket/client.py +++ b/deepgram/clients/listen/v1/websocket/client.py @@ -383,23 +383,12 @@ def _listening( self._logger.debug("ListenWebSocketClient._listening LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in ListenWebSocketClient._listening with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in ListenWebSocketClient._listening", - f"{e}", - "ConnectionClosed", - ) - self._emit( - LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error - ) - - # signal exit and close - self._signal_exit() - self._logger.debug("ListenWebSocketClient._listening LEAVE") if self._config.options.get("termination_exception") == "true": @@ -490,23 +479,12 @@ def _keep_alive(self) -> None: self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in ListenWebSocketClient._keep_alive with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in ListenWebSocketClient._keep_alive", - f"{e}", - "ConnectionClosed", - ) - self._emit( - LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error - ) - - # signal exit and close - self._signal_exit() - self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") if self._config.options.get("termination_exception") == "true": @@ -616,23 +594,12 @@ def _flush(self) -> None: self._logger.debug("ListenWebSocketClient._flush LEAVE") return - self._logger.error( - "ConnectionClosed in ListenWebSocketClient._flush with code %s: %s", + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( + "ConnectionClosed in ListenWebSocketClient._listening with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in ListenWebSocketClient._flush", - f"{e}", - "ConnectionClosed", - ) - self._emit( - LiveTranscriptionEvents(LiveTranscriptionEvents.Error), cc_error - ) - - # signal exit and close - self._signal_exit() - self._logger.debug("ListenWebSocketClient._flush LEAVE") if self._config.options.get("termination_exception") == "true": @@ -857,7 +824,7 @@ def _signal_exit(self) -> None: except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) except websockets.exceptions.ConnectionClosed as e: - self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) + self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) except websockets.exceptions.WebSocketException as e: self._logger.error("_signal_exit - WebSocketException: %s", str(e)) except Exception as e: # pylint: disable=broad-except diff --git a/deepgram/clients/speak/v1/websocket/async_client.py b/deepgram/clients/speak/v1/websocket/async_client.py index 53730d38..3b73fb23 100644 --- a/deepgram/clients/speak/v1/websocket/async_client.py +++ b/deepgram/clients/speak/v1/websocket/async_client.py @@ -186,7 +186,7 @@ async def start( self._logger.debug("AsyncSpeakStreamClient.start LEAVE") return True except websockets.ConnectionClosed as e: - self._logger.error( + self._logger.notice( "ConnectionClosed in AsyncSpeakStreamClient.start: %s", e ) self._logger.debug("AsyncSpeakStreamClient.start LEAVE") @@ -392,25 +392,12 @@ async def _listening(self) -> None: self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in AsyncSpeakStreamClient._listening", - f"{e}", - "ConnectionClosed", - ) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=cc_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") if self._config.options.get("termination_exception") == "true": @@ -516,25 +503,12 @@ async def _flush(self) -> None: self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") return - self._logger.error( - "ConnectionClosed in AsyncSpeakStreamClient._flush with code %s: %s", + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( + "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in AsyncSpeakStreamClient._flush", - f"{e}", - "ConnectionClosed", - ) - await self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=cc_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - await self._signal_exit() - self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") if self._config.options.get("termination_exception") == "true": @@ -621,10 +595,15 @@ async def send_raw(self, msg: str) -> bool: self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE") return False - if isinstance(msg, dict) and "type" in msg and msg["type"] == "Flush": - self._last_datagram = None - self._flush_count += 1 - self._logger.debug("Increment Flush count: %d", self._flush_count) + try: + myJson = json.loads(msg) + if "type" in myJson and myJson["type"] == "Flush": + with self._lock_flush: + self._last_datagram = None + self._flush_count += 1 + self._logger.debug("Increment Flush count: %d", self._flush_count) + except Exception as e: + self._logger.error("send_raw() failed - Exception: %s", str(e)) if self._socket is not None: try: @@ -762,7 +741,8 @@ async def _signal_exit(self) -> None: self._logger.verbose("send CloseStream...") try: # if the socket connection is closed, the following line might throw an error - await self.send_raw(json.dumps({"type": "CloseStream"})) + # need to explicitly use _socket.send (dont use self.send_raw) + await self._socket.send(json.dumps({"type": "CloseStream"})) except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) except websockets.exceptions.ConnectionClosed as e: diff --git a/deepgram/clients/speak/v1/websocket/client.py b/deepgram/clients/speak/v1/websocket/client.py index df124975..179e4163 100644 --- a/deepgram/clients/speak/v1/websocket/client.py +++ b/deepgram/clients/speak/v1/websocket/client.py @@ -372,21 +372,12 @@ def _listening( self._logger.debug("SpeakStreamClient._listening LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in SpeakStreamClient._listening with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in SpeakStreamClient._listening", - f"{e}", - "ConnectionClosed", - ) - self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), cc_error) - - # signal exit and close - self._signal_exit() - self._logger.debug("SpeakStreamClient._listening LEAVE") if self._config.options.get("termination_exception") == "true": @@ -451,7 +442,7 @@ def _flush(self) -> None: while True: try: counter += 1 - self._exit_event.wait(timeout=ONE_SECOND) + self._exit_event.wait(timeout=HALF_SECOND) if self._exit_event.is_set(): self._logger.notice("_flush exiting gracefully") @@ -488,25 +479,12 @@ def _flush(self) -> None: self._logger.debug("SpeakStreamClient._flush LEAVE") return - self._logger.error( + # no need to call self._signal_exit() here because we are already closed + self._logger.notice( "ConnectionClosed in SpeakStreamClient._flush with code %s: %s", e.code, e.reason, ) - cc_error: ErrorResponse = ErrorResponse( - "ConnectionClosed in SpeakStreamClient._flush", - f"{e}", - "ConnectionClosed", - ) - self._emit( - SpeakWebSocketEvents(SpeakWebSocketEvents.Error), - error=cc_error, - **dict(cast(Dict[Any, Any], self._kwargs)), - ) - - # signal exit and close - self._signal_exit() - self._logger.debug("SpeakStreamClient._flush LEAVE") if self._config.options.get("termination_exception") == "true": @@ -591,18 +569,16 @@ def send_raw(self, msg: str) -> bool: self._logger.notice("send exiting gracefully") self._logger.debug("SpeakStreamClient.send LEAVE") return False - - if ( - isinstance(msg, dict) - and "type" in msg - and msg["type"] == "Flush" - ): - with self._lock_flush: - self._last_datagram = None - self._flush_count += 1 - self._logger.debug( - "Increment Flush count: %d", self._flush_count - ) + + try: + myJson = json.loads(msg) + if "type" in myJson and myJson["type"] == "Flush": + with self._lock_flush: + self._last_datagram = None + self._flush_count += 1 + self._logger.debug("Increment Flush count: %d", self._flush_count) + except Exception as e: + self._logger.error("send_raw() failed - Exception: %s", str(e)) if self._socket is not None: with self._lock_send: @@ -729,11 +705,12 @@ def _signal_exit(self) -> None: self._logger.notice("sending Close...") try: # if the socket connection is closed, the following line might throw an error - self._socket.send_raw(json.dumps({"type": "Close"})) + # need to explicitly use _socket.send (dont use self.send_raw) + self._socket.send(json.dumps({"type": "CloseStream"})) except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) except websockets.exceptions.ConnectionClosed as e: - self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) + self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) except websockets.exceptions.WebSocketException as e: self._logger.error("_signal_exit - WebSocketException: %s", str(e)) except Exception as e: # pylint: disable=broad-except diff --git a/examples/text-to-speech/websocket/simple/main-non-containerized.py b/examples/text-to-speech/websocket/simple/main-non-containerized.py index 54a60c9c..0e12d51f 100644 --- a/examples/text-to-speech/websocket/simple/main-non-containerized.py +++ b/examples/text-to-speech/websocket/simple/main-non-containerized.py @@ -23,7 +23,7 @@ def main(): # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM config: DeepgramClientOptions = DeepgramClientOptions( url="api.beta.deepgram.com", - options={"auto_flush_speak_delta": "500"}, + options={"auto_flush_speak_delta": "1000"}, verbose=verboselogs.SPAM, ) deepgram: DeepgramClient = DeepgramClient("", config)