From 3b6c941a9b8c8b088dbaadeba5802500a1c5eccb Mon Sep 17 00:00:00 2001 From: Sandra Rodgers Date: Mon, 24 Jun 2024 15:49:50 -0500 Subject: [PATCH] adds TTS streaming --- deepgram/__init__.py | 2 + deepgram/client.py | 25 + deepgram/clients/__init__.py | 4 + deepgram/clients/live/v1/async_client.py | 13 +- deepgram/clients/speak/__init__.py | 2 + deepgram/clients/speak/client.py | 4 + deepgram/clients/speak/enums.py | 22 + deepgram/clients/speak/errors.py | 2 +- deepgram/clients/speak/v1/__init__.py | 2 + .../clients/speak/v1/async_client_stream.py | 596 ++++++++++++++++++ deepgram/clients/speak/v1/client_stream.py | 561 +++++++++++++++++ deepgram/clients/speak/v1/response.py | 163 +++++ .../speak-stream/async-interactive/main.py | 99 +++ examples/speak-stream/interactive/main.py | 103 +++ 14 files changed, 1592 insertions(+), 6 deletions(-) create mode 100644 deepgram/clients/speak/enums.py create mode 100644 deepgram/clients/speak/v1/async_client_stream.py create mode 100644 deepgram/clients/speak/v1/client_stream.py create mode 100644 examples/speak-stream/async-interactive/main.py create mode 100644 examples/speak-stream/interactive/main.py diff --git a/deepgram/__init__.py b/deepgram/__init__.py index 06e2766d..219a8c11 100644 --- a/deepgram/__init__.py +++ b/deepgram/__init__.py @@ -83,8 +83,10 @@ # speak from .client import SpeakClient, AsyncSpeakClient +from .client import SpeakStreamClient, AsyncSpeakStreamClient from .client import SpeakOptions, SpeakStreamSource, SpeakSource from .client import SpeakResponse +from .client import SpeakStreamEvents # manage from .client import ManageClient, AsyncManageClient diff --git a/deepgram/client.py b/deepgram/client.py index 378ab4ab..77fb2781 100644 --- a/deepgram/client.py +++ b/deepgram/client.py @@ -78,8 +78,10 @@ # speak client classes/input from .clients import SpeakClient, AsyncSpeakClient +from .clients import SpeakStreamClient, AsyncSpeakStreamClient from .clients import SpeakOptions from .clients import SpeakStreamSource, SpeakSource +from .clients import SpeakStreamEvents # speak client responses from .clients import SpeakResponse @@ -236,6 +238,19 @@ def asyncspeak(self): return self.Version(self._config, "asyncspeak") @property + def speakstream(self): + """ + Returns a SpeakStreamClient instance for interacting with Deepgram's speak services. + """ + return self.Version(self._config, "speak-stream") + + @property + def asyncspeakstream(self): + """ + Returns an AsyncSpeakStreamClient instance for interacting with Deepgram's speak services. + """ + return self.Version(self._config, "asyncspeak-stream") + @property def manage(self): """ Returns a ManageClient instance for managing Deepgram resources. @@ -310,6 +325,7 @@ def __init__(self, config, parent: str): # raise DeepgramModuleError("Invalid parent") def v(self, version: str = ""): + # pylint: disable-msg=too-many-statements """ Returns a client for the specified version of the API. """ @@ -340,6 +356,14 @@ def v(self, version: str = ""): parent = "speak" filename = "async_client" classname = "AsyncSpeakClient" + case "speak-stream": + parent = "speak" + filename = "client_stream" + classname = "SpeakStreamClient" + case "asyncspeak-stream": + parent = "speak" + filename = "async_client_stream" + classname = "AsyncSpeakStreamClient" case "selfhosted": parent = "selfhosted" filename = "client" @@ -376,3 +400,4 @@ def v(self, version: str = ""): self._logger.notice("Version.v succeeded") self._logger.debug("Version.v LEAVE") return my_class_instance + # pylint: enable-msg=too-many-statements diff --git a/deepgram/clients/__init__.py b/deepgram/clients/__init__.py index f8375143..a89fab48 100644 --- a/deepgram/clients/__init__.py +++ b/deepgram/clients/__init__.py @@ -65,8 +65,12 @@ SpeakStreamSource, SpeakSource, ) +from .speak import SpeakStreamEvents from .speak import SpeakResponse +# speak-stream +from .speak import SpeakStreamClient, AsyncSpeakStreamClient + # manage from .manage import ManageClient, AsyncManageClient from .manage import ( diff --git a/deepgram/clients/live/v1/async_client.py b/deepgram/clients/live/v1/async_client.py index 2f09d12b..81b7454f 100644 --- a/deepgram/clients/live/v1/async_client.py +++ b/deepgram/clients/live/v1/async_client.py @@ -221,7 +221,8 @@ def on(self, event: LiveTranscriptionEvents, handler) -> None: """ self._logger.info("event subscribed: %s", event) if event in LiveTranscriptionEvents.__members__.values() and callable(handler): - self._event_handlers[event].append(handler) + if handler not in self._event_handlers[event]: + self._event_handlers[event].append(handler) # triggers the registered event handlers for a specific event async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: @@ -238,11 +239,13 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: tasks = [] for handler in self._event_handlers[event]: - tasks.append(asyncio.create_task(handler(self, *args, **kwargs))) + task = asyncio.create_task(handler(self, *args, **kwargs)) + tasks.append(task) - if len(tasks) > 0: + if tasks: self._logger.debug("waiting for tasks to finish...") - await asyncio.gather(*filter(None, tasks), return_exceptions=True) + await asyncio.gather(*tasks, return_exceptions=True) + tasks.clear() # debug the threads for thread in threading.enumerate(): @@ -380,7 +383,7 @@ async def _listening(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_listening({e.code}) exiting gracefully") self._logger.debug("AsyncLiveClient._listening LEAVE") return diff --git a/deepgram/clients/speak/__init__.py b/deepgram/clients/speak/__init__.py index 522dbefe..3636a94a 100644 --- a/deepgram/clients/speak/__init__.py +++ b/deepgram/clients/speak/__init__.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: MIT from .client import SpeakClient +from .client import SpeakStreamClient, AsyncSpeakStreamClient from .client import AsyncSpeakClient from .client import SpeakOptions from .client import SpeakResponse @@ -12,4 +13,5 @@ SpeakSource, ) +from .enums import SpeakStreamEvents from ...options import DeepgramClientOptions, ClientOptionsFromEnv diff --git a/deepgram/clients/speak/client.py b/deepgram/clients/speak/client.py index e0dda23c..b9975a46 100644 --- a/deepgram/clients/speak/client.py +++ b/deepgram/clients/speak/client.py @@ -4,6 +4,8 @@ from .v1.client import SpeakClient as SpeakClientLatest from .v1.async_client import AsyncSpeakClient as AsyncSpeakClientLatest +from .v1.client_stream import SpeakStreamClient as SpeakStreamClientLatest +from .v1.async_client_stream import AsyncSpeakStreamClient as AsyncSpeakStreamClientLatest from .v1.options import ( SpeakOptions as SpeakOptionsLatest, FileSource as FileSourceLatest, @@ -29,3 +31,5 @@ SpeakResponse = SpeakResponseLatest SpeakClient = SpeakClientLatest AsyncSpeakClient = AsyncSpeakClientLatest +SpeakStreamClient = SpeakStreamClientLatest +AsyncSpeakStreamClient = AsyncSpeakStreamClientLatest diff --git a/deepgram/clients/speak/enums.py b/deepgram/clients/speak/enums.py new file mode 100644 index 00000000..507bf339 --- /dev/null +++ b/deepgram/clients/speak/enums.py @@ -0,0 +1,22 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from aenum import StrEnum + +# Constants mapping to events from the Deepgram API + + +class SpeakStreamEvents(StrEnum): + """ + Enumerates the possible events that can be received from the Deepgram API + """ + + Open: str = "Open" + Close: str = "Close" + AudioData: str = "AudioData" + Metadata: str = "Metadata" + Flush: str = "Flush" + Unhandled: str = "Unhandled" + Error: str = "Error" + Warning: str = "Warning" diff --git a/deepgram/clients/speak/errors.py b/deepgram/clients/speak/errors.py index 5636fd3c..a39020a1 100644 --- a/deepgram/clients/speak/errors.py +++ b/deepgram/clients/speak/errors.py @@ -22,7 +22,7 @@ def __str__(self): class DeepgramTypeError(Exception): """ - Exception raised for unknown errors related to unknown Types for Transcription. + Exception raised for unknown errors related to unknown Types for TTS Synthesis. Attributes: message (str): The error message describing the exception. diff --git a/deepgram/clients/speak/v1/__init__.py b/deepgram/clients/speak/v1/__init__.py index 42090b8e..d686df2b 100644 --- a/deepgram/clients/speak/v1/__init__.py +++ b/deepgram/clients/speak/v1/__init__.py @@ -3,6 +3,8 @@ # SPDX-License-Identifier: MIT from .client import SpeakClient +from .client_stream import SpeakStreamClient +from .async_client_stream import AsyncSpeakStreamClient from .async_client import AsyncSpeakClient from .options import SpeakOptions, FileSource, SpeakStreamSource, SpeakSource from .response import SpeakResponse diff --git a/deepgram/clients/speak/v1/async_client_stream.py b/deepgram/clients/speak/v1/async_client_stream.py new file mode 100644 index 00000000..9130ebf3 --- /dev/null +++ b/deepgram/clients/speak/v1/async_client_stream.py @@ -0,0 +1,596 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT +import asyncio +import json +import logging +from typing import Dict, Union, Optional, cast, Any +import threading + +import websockets +from websockets.client import WebSocketClientProtocol + +from ....utils import verboselogs +from ....options import DeepgramClientOptions +from ..enums import SpeakStreamEvents +from ...live.helpers import convert_to_websocket_url, append_query_params +from ..errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakOptions + + +class AsyncSpeakStreamClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: WebSocketClientProtocol + _event_handlers: Dict[SpeakStreamEvents, list] + + _listen_thread: Union[asyncio.Task, None] + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config are required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + + self._listen_thread = None + + # exit + self._exit_event = asyncio.Event() + + self._event_handlers = { + event: [] for event in SpeakStreamEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + # pylint: disable=too-many-branches,too-many-statements + async def start( + self, + options: Optional[Union[SpeakOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("AsyncSpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakOptions): + self._logger.info("SpeakOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + + try: + self._socket = await websockets.connect( + url_with_params, + extra_headers=combined_headers + ) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listen thread + self._listen_thread = asyncio.create_task(self._listening()) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Open), + OpenResponse(type=SpeakStreamEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + return True + except websockets.ConnectionClosed as e: + self._logger.error("ConnectionClosed in AsyncSpeakStreamClient.start: %s", e) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("WebSocketException in AsyncSpeakStreamClient.start: %s", e) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("WebSocketException in AsyncSpeakStreamClient.start: %s", e) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + + # pylint: enable=too-many-branches,too-many-statements + + def on(self, event: SpeakStreamEvents, handler) -> None: + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakStreamEvents.__members__.values() and callable(handler): + if handler not in self._event_handlers[event]: + self._event_handlers[event].append(handler) + + # triggers the registered event handlers for a specific event + async def _emit(self, event: SpeakStreamEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("AsyncSpeakStreamClient._emit ENTER") + self._logger.debug("callback handlers for: %s", event) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + for handler in self._event_handlers[event]: + task = asyncio.create_task(handler(self, *args, **kwargs)) + tasks.append(task) + + if tasks: + self._logger.debug("waiting for tasks to finish...") + await asyncio.gather(*tasks, return_exceptions=True) + tasks.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("AsyncSpeakStreamClient._emit LEAVE") + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + async def _listening(self) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("AsyncSpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + message = await self._socket.recv() + + if message is None: + self._logger.spam("message is None") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + data = json.loads(message) + response_type = data.get("type") + self._logger.debug("response_type: %s, data: %s", response_type, data) + + match response_type: + case SpeakStreamEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json( + message + ) + self._logger.verbose("MetadataResponse: %s", meta_result) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Flush: + fl_result: FlushedResponse = ( + FlushedResponse.from_json(message) + ) + self._logger.verbose("FlushedResponse: %s", fl_result) + await self._emit( + SpeakStreamEvents( + SpeakStreamEvents.Flush + ), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Close: + close_result: CloseResponse = CloseResponse.from_json(message) + self._logger.verbose("CloseResponse: %s", close_result) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json(message) + self._logger.verbose("WarningResponse: %s", war_warning) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakStreamEvents( + SpeakStreamEvents.Unhandled + ), + raw=message, + ) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + self._logger.error( + "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in AsyncSpeakStreamClient._listening", + f"{e}", + "ConnectionClosed", + ) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), + error=cc_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient._listening: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncSpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in AsyncSpeakStreamClient._listening: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncSpeakStreamClient._listening", + f"{e}", + "Exception", + ) + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: disable=too-many-return-statements + + async def send(self, text_input: str) -> bool: + """ + Sends data over the WebSocket connection. + """ + self._logger.spam("AsyncSpeakStreamClient.send ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send LEAVE") + return False + + if self._socket is not None: + try: + await self._socket.send(json.dumps({"type": "Speak", "text": text_input})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send() exiting gracefully: {e.code}") + self._logger.debug("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"send({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + + self._logger.error("send() failed - ConnectionClosed: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("send() failed - WebSocketException: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send() failed - Exception: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send() succeeded") + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + return True + + self._logger.spam("send() failed. socket is None") + self._logger.spam("AsyncSpeakStreamClient.send LEAVE") + return False + + # pylint: enable=too-many-return-statements + + async def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("AsyncSpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = self.send(json.dumps({"type": "Flush"})) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + + return True + + async def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.debug("AsyncSpeakStreamClient.finish ENTER") + + # signal exit + await self._signal_exit() + + # stop the threads + self._logger.verbose("cancelling tasks...") + try: + # Before cancelling, check if the tasks were created + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + + if self._listen_thread is not None: + self._listen_thread.cancel() + tasks.append(self._listen_thread) + self._logger.notice("processing _listen_thread cancel...") + + # Use asyncio.gather to wait for tasks to be cancelled + await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) # Prevent indefinite waiting + self._logger.notice("threads joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("AsyncSpeakStreamClient.finish LEAVE") + return True + + except asyncio.CancelledError as e: + self._logger.error("tasks cancelled error: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + except asyncio.TimeoutError as e: + self._logger.error("tasks cancellation timed out: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + async def _signal_exit(self) -> None: + # send close event + self._logger.verbose("closing socket...") + if self._socket is not None: + self._logger.verbose("send CloseStream...") + try: + # if the socket connection is closed, the following line might throw an error + await self.send(json.dumps({"type": "CloseStream"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # push close event + try: + await self._emit( + SpeakStreamEvents(SpeakStreamEvents.Close), + close=CloseResponse(type=SpeakStreamEvents.Close), + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_emit - Exception: %s", e) + + # wait for task to send + await asyncio.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + await self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + self._socket = None # type: ignore \ No newline at end of file diff --git a/deepgram/clients/speak/v1/client_stream.py b/deepgram/clients/speak/v1/client_stream.py new file mode 100644 index 00000000..49ae9d99 --- /dev/null +++ b/deepgram/clients/speak/v1/client_stream.py @@ -0,0 +1,561 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import json +import time +import logging +from typing import Dict, Union, Optional, cast, Any +import threading + +from websockets.sync.client import connect, ClientConnection +import websockets + +from ....utils import verboselogs +from ....options import DeepgramClientOptions +from ..enums import SpeakStreamEvents +from ...live.helpers import convert_to_websocket_url, append_query_params +from ..errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakOptions + + +class SpeakStreamClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: ClientConnection + _exit_event: threading.Event + _lock_send: threading.Lock + _event_handlers: Dict[SpeakStreamEvents, list] + + + _listen_thread: Union[threading.Thread, None] + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + def __init__(self, config: DeepgramClientOptions): + if config is None: + raise DeepgramError("Config are required") + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + self._lock_send = threading.Lock() + + self._listen_thread = None + + # exit + self._exit_event = threading.Event() + + self._event_handlers = { + event: [] for event in SpeakStreamEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + # pylint: disable=too-many-statements,too-many-branches + def start( + self, + options: Optional[Union[SpeakOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("SpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakOptions): + self._logger.info("SpeakOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + try: + self._socket = connect(url_with_params, additional_headers=combined_headers) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listening thread + self._listen_thread = threading.Thread(target=self._listening) + self._listen_thread.start() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Open), + OpenResponse(type=SpeakStreamEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("SpeakStreamClient.start LEAVE") + return True + except websockets.ConnectionClosed as e: + self._logger.error("ConnectionClosed in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + + # pylint: enable=too-many-statements,too-many-branches + + def on( + self, event: SpeakStreamEvents, handler + ) -> None: # registers event handlers for specific events + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakStreamEvents.__members__.values() and callable(handler): + self._event_handlers[event].append(handler) + + def _emit(self, event: SpeakStreamEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("callback handlers for: %s", event) + for handler in self._event_handlers[event]: + handler(self, *args, **kwargs) + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + def _listening( + self, + ) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("SpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + message = self._socket.recv() + + if message is None: + self._logger.info("message is empty") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + self._emit( + SpeakStreamEvents(SpeakStreamEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + data = json.loads(message) + response_type = data.get("type") + self._logger.debug("response_type: %s, data: %s", response_type, data) + + match response_type: + case SpeakStreamEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json( + message + ) + self._logger.verbose("MetadataResponse: %s", meta_result) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Flush: + fl_result: FlushedResponse = ( + FlushedResponse.from_json(message) + ) + self._logger.verbose("FlushedResponse: %s", fl_result) + self._emit( + SpeakStreamEvents( + SpeakStreamEvents.Flush + ), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Close: + close_result: CloseResponse = CloseResponse.from_json(message) + self._logger.verbose("CloseResponse: %s", close_result) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json(message) + self._logger.verbose("WarningResponse: %s", war_warning) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakStreamEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakStreamEvents( + SpeakStreamEvents.Unhandled + ), + raw=message, + ) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + self._logger.error( + "ConnectionClosed in SpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + cc_error: ErrorResponse = ErrorResponse( + "ConnectionClosed in SpeakStreamClient._listening", + f"{e}", + "ConnectionClosed", + ) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), cc_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in SpeakStreamClient._listening with: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in SpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), ws_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in SpeakStreamClient._listening: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in SpeakStreamClient._listening", + f"{e}", + "Exception", + ) + self._logger.error("Exception in SpeakStreamClient._listening: %s", str(e)) + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Error), e_error + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + + # pylint: disable=too-many-return-statements + def send(self, text_input: str) -> bool: + """ + Sends data over the WebSocket connection. + """ + self._logger.spam("SpeakStreamClient.send ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send exiting gracefully") + self._logger.debug("SpeakStreamClient.send LEAVE") + return False + + if self._socket is not None: + with self._lock_send: + try: + self._socket.send(json.dumps({"type": "Speak", "text": text_input})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send() exiting gracefully: {e.code}") + self._logger.debug("SpeakStreamClient._keep_alive LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code == 1000: + self._logger.notice(f"send({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient.send LEAVE") + if ( + self._config.options.get("termination_exception_send") + == "true" + ): + raise + return True + self._logger.error("send() failed - ConnectionClosed: %s", str(e)) + self._logger.spam("SpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("send() failed - WebSocketException: %s", str(e)) + self._logger.spam("SpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send() failed - Exception: %s", str(e)) + self._logger.spam("SpeakStreamClient.send LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send() succeeded") + self._logger.spam("SpeakStreamClient.send LEAVE") + return True + + self._logger.spam("send() failed. socket is None") + self._logger.spam("SpeakStreamClient.send LEAVE") + return False + + # pylint: enable=too-many-return-statements + + + def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("SpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = self.send(json.dumps({"type": "Flush"})) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("SpeakStreamClient.flush LEAVE") + + return True + + # closes the WebSocket connection gracefully + def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.spam("SpeakStreamClient.finish ENTER") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # signal exit + self._signal_exit() + + # stop the threads + + if self._listen_thread is not None: + self._listen_thread.join() + self._listen_thread = None + self._logger.notice("listening thread joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("SpeakStreamClient.finish LEAVE") + return True + + # signals the WebSocket connection to exit + def _signal_exit(self) -> None: + # closes the WebSocket connection gracefully + self._logger.notice("closing socket...") + if self._socket is not None: + self._logger.notice("sending Close...") + try: + # if the socket connection is closed, the following line might throw an error + self._socket.send(json.dumps({"type": "Close"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # push close event + try: + self._emit( + SpeakStreamEvents(SpeakStreamEvents.Close), + CloseResponse(type=SpeakStreamEvents.Close), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", e) + + # wait for task to send + time.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + self._socket = None # type: ignore diff --git a/deepgram/clients/speak/v1/response.py b/deepgram/clients/speak/v1/response.py index 182f51a5..8130f9e6 100644 --- a/deepgram/clients/speak/v1/response.py +++ b/deepgram/clients/speak/v1/response.py @@ -44,3 +44,166 @@ def __setitem__(self, key, val): def __str__(self) -> str: my_dict = self.to_dict() return my_dict.__str__() + +@dataclass +class SpeakStreamResponse(DataClassJsonMixin): # pylint: disable=too-many-instance-attributes + """ + A class for representing a response from the speak (streaming) endpoint. + """ + + content_type: str = "" + request_id: str = "" + model_uuid: str = "" + model_name: str = "" + date: str = "" + stream: Optional[io.BytesIO] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + # this is a hack to make the response look like a dict because of the io.BytesIO object + # otherwise it will throw an exception on printing + def __str__(self) -> str: + my_dict = self.to_dict() + return my_dict.__str__() + +@dataclass +class OpenResponse(DataClassJsonMixin): + """ + Open Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + +@dataclass +class MetadataResponse(DataClassJsonMixin): + """ + Metadata object + """ + + request_id: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + +@dataclass +class FlushedResponse(DataClassJsonMixin): + """ + Flushed Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + +@dataclass +class CloseResponse(DataClassJsonMixin): + """ + Close Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + +@dataclass +class ErrorResponse(DataClassJsonMixin): + """ + Error Message from the Deepgram Platform + """ + + description: str = "" + message: str = "" + type: str = "" + variant: Optional[str] = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + +@dataclass +class WarningResponse(DataClassJsonMixin): + """ + Warning Message from the Deepgram Platform + """ + + warn_code: str = "" + warn_msg: str = "" + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +# Unhandled Message + +@dataclass +class UnhandledResponse(DataClassJsonMixin): + """ + Unhandled Message from the Deepgram Platform + """ + + type: str = "" + raw: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) \ No newline at end of file diff --git a/examples/speak-stream/async-interactive/main.py b/examples/speak-stream/async-interactive/main.py new file mode 100644 index 00000000..b0bc2f71 --- /dev/null +++ b/examples/speak-stream/async-interactive/main.py @@ -0,0 +1,99 @@ +from dotenv import load_dotenv +import asyncio +from websockets.exceptions import ConnectionClosedError +from deepgram.utils import verboselogs +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakStreamEvents, + SpeakOptions, +) + +load_dotenv() + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." +AUDIO_FILE = "output.mp3" + +async def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.asyncspeakstream.v("1") + + async def on_open(client, open_response, **kwargs): + print(f"\n\nOpen: {open_response}\n\n") + await send_tts_text(client) + + async def on_binary_data(client, data, **kwargs): + print("Received binary data") + await write_binary_to_mp3(data) + + async def on_metadata(client, metadata, **kwargs): + print(f"\n\nMetadata: {metadata}\n\n") + + async def on_flush(client, flush, **kwargs): + print(f"\n\nFlush: {flush}\n\n") + + async def on_close(client, close, **kwargs): + print(f"\n\nClose: {close}\n\n") + + async def on_warning(client, warning, **kwargs): + print(f"\n\nWarning: {warning}\n\n") + + async def on_error(client, error, **kwargs): + print(f"\n\nError: {error}\n\n") + + async def on_unhandled(client, unhandled, **kwargs): + print(f"\n\nUnhandled: {unhandled}\n\n") + + async def write_binary_to_mp3(data): + loop = asyncio.get_running_loop() + try: + with open(AUDIO_FILE, "ab") as f: + await loop.run_in_executor(None, f.write, data) + except Exception as e: + print(f"Failed to write data to file: {e}") + finally: + print("File operation completed.") + + dg_connection.on(SpeakStreamEvents.Open, on_open) + dg_connection.on(SpeakStreamEvents.AudioData, on_binary_data) + dg_connection.on(SpeakStreamEvents.Metadata, on_metadata) + dg_connection.on(SpeakStreamEvents.Flush, on_flush) + dg_connection.on(SpeakStreamEvents.Close, on_close) + dg_connection.on(SpeakStreamEvents.Warning, on_warning) + dg_connection.on(SpeakStreamEvents.Error, on_error) + dg_connection.on(SpeakStreamEvents.Unhandled, on_unhandled) + + async def send_tts_text(client): + await client.send(TTS_TEXT) + + # Connect to the WebSocket + options = SpeakOptions(model="aura-asteria-en") + + if not await dg_connection.start(options): + print("Failed to start connection") + return + + # Wait for user input to finish + await asyncio.get_event_loop().run_in_executor(None, input, "\n\nPress Enter to stop...\n\n") + await dg_connection.finish() + + print("Finished") + + except ConnectionClosedError as e: + print(f"WebSocket connection closed unexpectedly: {e}") + except asyncio.CancelledError as e: + print(f"Asyncio task was cancelled: {e}") + except OSError as e: + print(f"File operation failed: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/speak-stream/interactive/main.py b/examples/speak-stream/interactive/main.py new file mode 100644 index 00000000..f13b0864 --- /dev/null +++ b/examples/speak-stream/interactive/main.py @@ -0,0 +1,103 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from dotenv import load_dotenv +import threading +from websockets.exceptions import ConnectionClosedError +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakStreamEvents, + SpeakOptions, +) + +load_dotenv() + + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." +AUDIO_FILE = "output.mp3" + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions(verbose=verboselogs.DEBUG) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speakstream.v("1") + # print(dg_connection) + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + thread = threading.Thread(target=send_tts_text, args=(self,)) + thread.start() + thread.join() + + def on_binary_data(self, data, **kwargs): + print("Received binary data:") + with open(AUDIO_FILE, "ab") as f: + f.write(data) + + def on_metadata(self, metadata, **kwargs): + print(f"\n\n{metadata}\n\n") + + def on_flush(self, flush, **kwargs): + print(f"\n\n{flush}\n\n") + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + def on_warning(self, warning, **kwargs): + print(f"\n\n{warning}\n\n") + + def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(SpeakStreamEvents.Open, on_open) + dg_connection.on(SpeakStreamEvents.AudioData, on_binary_data) + dg_connection.on(SpeakStreamEvents.Metadata, on_metadata) + dg_connection.on(SpeakStreamEvents.Flush, on_flush) + dg_connection.on(SpeakStreamEvents.Close, on_close) + dg_connection.on(SpeakStreamEvents.Error, on_error) + dg_connection.on(SpeakStreamEvents.Warning, on_warning) + dg_connection.on(SpeakStreamEvents.Unhandled, on_unhandled) + + lock = threading.Lock() + + def send_tts_text(dg_connection): + with lock: + dg_connection.send(TTS_TEXT) + + # connect to websocket + options = SpeakOptions(model="aura-asteria-en") + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # Indicate that we've finished + input("\n\nPress Enter to stop...\n\n") + dg_connection.finish() + + print("Finished") + + except ConnectionClosedError as e: + print(f"WebSocket connection closed unexpectedly: {e}") + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main()