diff --git a/.changeset/friendly-wombats-help.md b/.changeset/friendly-wombats-help.md new file mode 100644 index 0000000..1d0b9d3 --- /dev/null +++ b/.changeset/friendly-wombats-help.md @@ -0,0 +1,6 @@ +--- +"@livekit/agents": patch +"@livekit/agents-plugin-deepgram": minor +--- + +add Deepgram text-to-speech plugin diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 6110ba1..3ef8b2e 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -297,6 +297,7 @@ export class AsyncIterableQueue implements AsyncIterableIterator { } } +/** @internal */ export class ExpFilter { #alpha: number; #max?: number; @@ -337,3 +338,31 @@ export class ExpFilter { this.#alpha = alpha; } } + +/** @internal */ +export class AudioEnergyFilter { + #cooldownSeconds: number; + #cooldown: number; + + constructor(cooldownSeconds = 1) { + this.#cooldownSeconds = cooldownSeconds; + this.#cooldown = cooldownSeconds; + } + + pushFrame(frame: AudioFrame): boolean { + const arr = Float32Array.from(frame.data, (x) => x / 32768); + const rms = (arr.map((x) => x ** 2).reduce((acc, x) => acc + x) / arr.length) ** 0.5; + if (rms > 0.004) { + this.#cooldown = this.#cooldownSeconds; + return true; + } + + const durationSeconds = frame.samplesPerChannel / frame.sampleRate; + this.#cooldown -= durationSeconds; + if (this.#cooldown > 0) { + return true; + } + + return false; + } +} diff --git a/examples/package.json b/examples/package.json index 52b1329..22f00ca 100644 --- a/examples/package.json +++ b/examples/package.json @@ -14,6 +14,7 @@ }, "dependencies": { "@livekit/agents": "workspace:*", + "@livekit/agents-plugin-deepgram": "workspace:*", "@livekit/agents-plugin-elevenlabs": "workspace:*", "@livekit/agents-plugin-openai": "workspace:*", "@livekit/rtc-node": "^0.10.2", diff --git a/examples/src/stt.ts b/examples/src/stt.ts new file mode 100644 index 0000000..541a277 --- /dev/null +++ b/examples/src/stt.ts @@ -0,0 +1,44 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { type JobContext, WorkerOptions, cli, defineAgent, stt } from '@livekit/agents'; +import { STT } from '@livekit/agents-plugin-deepgram'; +import type { Track } from '@livekit/rtc-node'; +import { AudioStream, RoomEvent, TrackKind } from '@livekit/rtc-node'; +import { fileURLToPath } from 'node:url'; + +export default defineAgent({ + entry: async (ctx: JobContext) => { + await ctx.connect(); + console.log('starting STT example agent'); + + const transcribeTrack = async (track: Track) => { + const audioStream = new AudioStream(track); + const sttStream = new STT({ sampleRate: 48000 }).stream(); + + const sendTask = async () => { + for await (const event of audioStream) { + sttStream.pushFrame(event); + } + }; + + const recvTask = async () => { + for await (const event of sttStream) { + if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT) { + console.log(event.alternatives[0].text); + } + } + }; + + Promise.all([sendTask(), recvTask()]); + }; + + ctx.room.on(RoomEvent.TrackSubscribed, async (track: Track) => { + if (track.kind === TrackKind.KIND_AUDIO) { + transcribeTrack(track); + } + }); + }, +}); + +cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/deepgram/api-extractor.json b/plugins/deepgram/api-extractor.json new file mode 100644 index 0000000..1f75e07 --- /dev/null +++ b/plugins/deepgram/api-extractor.json @@ -0,0 +1,20 @@ +/** + * Config file for API Extractor. For more info, please visit: https://api-extractor.com + */ +{ + "$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", + + /** + * Optionally specifies another JSON config file that this file extends from. This provides a way for + * standard settings to be shared across multiple projects. + * + * If the path starts with "./" or "../", the path is resolved relative to the folder of the file that contains + * the "extends" field. Otherwise, the first path segment is interpreted as an NPM package name, and will be + * resolved using NodeJS require(). + * + * SUPPORTED TOKENS: none + * DEFAULT VALUE: "" + */ + "extends": "../../api-extractor-shared.json", + "mainEntryPointFilePath": "./dist/index.d.ts" +} diff --git a/plugins/deepgram/package.json b/plugins/deepgram/package.json new file mode 100644 index 0000000..a580d8a --- /dev/null +++ b/plugins/deepgram/package.json @@ -0,0 +1,27 @@ +{ + "name": "@livekit/agents-plugin-deepgram", + "version": "0.0.0", + "description": "Deepgram plugin for LiveKit Agents for Node.js", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "author": "LiveKit", + "type": "module", + "scripts": { + "build": "tsc", + "clean": "rm -rf dist", + "clean:build": "pnpm clean && pnpm build", + "lint": "eslint -f unix \"src/**/*.{ts,js}\"", + "api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript", + "api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose" + }, + "devDependencies": { + "@microsoft/api-extractor": "^7.35.0", + "@types/ws": "^8.5.10", + "typescript": "^5.0.0" + }, + "dependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "^0.10.2", + "ws": "^8.16.0" + } +} diff --git a/plugins/deepgram/src/index.ts b/plugins/deepgram/src/index.ts new file mode 100644 index 0000000..a649b5e --- /dev/null +++ b/plugins/deepgram/src/index.ts @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export * from './stt.js'; diff --git a/plugins/deepgram/src/models.ts b/plugins/deepgram/src/models.ts new file mode 100644 index 0000000..63b60b0 --- /dev/null +++ b/plugins/deepgram/src/models.ts @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export type STTModels = + | 'nova-general' + | 'nova-phonecall' + | 'nova-meeting' + | 'nova-2-general' + | 'nova-2-meeting' + | 'nova-2-phonecall' + | 'nova-2-finance' + | 'nova-2-conversationalai' + | 'nova-2-voicemail' + | 'nova-2-video' + | 'nova-2-medical' + | 'nova-2-drivethru' + | 'nova-2-automotive' + | 'enhanced-general' + | 'enhanced-meeting' + | 'enhanced-phonecall' + | 'enhanced-finance' + | 'base' + | 'meeting' + | 'phonecall' + | 'finance' + | 'conversationalai' + | 'voicemail' + | 'video' + | 'whisper-tiny' + | 'whisper-base' + | 'whisper-small' + | 'whisper-medium' + | 'whisper-large'; + +export type STTLanguages = + | 'da' + | 'de' + | 'en' + | 'en-AU' + | 'en-GB' + | 'en-IN' + | 'en-NZ' + | 'en-US' + | 'es' + | 'es-419' + | 'es-LATAM' + | 'fr' + | 'fr-CA' + | 'hi' + | 'hi-Latn' + | 'id' + | 'it' + | 'ja' + | 'ko' + | 'nl' + | 'no' + | 'pl' + | 'pt' + | 'pt-BR' + | 'ru' + | 'sv' + | 'ta' + | 'taq' + | 'th' + | 'tr' + | 'uk' + | 'zh' + | 'zh-CN' + | 'zh-TW'; diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts new file mode 100644 index 0000000..fbe67bf --- /dev/null +++ b/plugins/deepgram/src/stt.ts @@ -0,0 +1,307 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { AudioByteStream, AudioEnergyFilter, log, stt } from '@livekit/agents'; +import { type AudioFrame } from '@livekit/rtc-node'; +import { type RawData, WebSocket } from 'ws'; +import type { STTLanguages, STTModels } from './models.js'; + +const API_BASE_URL_V1 = 'wss://api.deepgram.com/v1/listen'; + +export interface STTOptions { + apiKey?: string; + language?: STTLanguages | string; + detectLanguage: boolean; + interimResults: boolean; + punctuate: boolean; + model: STTModels; + smartFormat: boolean; + noDelay: boolean; + endpointing: number; + fillerWords: boolean; + sampleRate: number; + numChannels: number; + keywords: [string, number][]; + profanityFilter: boolean; +} + +const defaultSTTOptions: STTOptions = { + apiKey: process.env.DEEPGRAM_API_KEY, + language: 'en-US', + detectLanguage: false, + interimResults: true, + punctuate: true, + model: 'nova-2-general', + smartFormat: true, + noDelay: true, + endpointing: 25, + fillerWords: false, + sampleRate: 16000, + numChannels: 1, + keywords: [], + profanityFilter: false, +}; + +export class STT extends stt.STT { + #opts: STTOptions; + #logger = log(); + + constructor(opts: Partial = defaultSTTOptions) { + super({ + streaming: true, + interimResults: opts.interimResults || defaultSTTOptions.interimResults, + }); + if (opts.apiKey === undefined) { + throw new Error( + 'Deepgram API key is required, whether as an argument or as $DEEPGRAM_API_KEY', + ); + } + + this.#opts = { ...defaultSTTOptions, ...opts }; + + if (this.#opts.detectLanguage) { + this.#opts.language = undefined; + } else if ( + this.#opts.language && + !['en-US', 'en'].includes(this.#opts.language) && + [ + 'nova-2-meeting', + 'nova-2-phonecall', + 'nova-2-finance', + 'nova-2-conversationalai', + 'nova-2-voicemail', + 'nova-2-video', + 'nova-2-medical', + 'nova-2-drivethru', + 'nova-2-automotive', + ].includes(this.#opts.model) + ) { + this.#logger.warn( + `${this.#opts.model} does not support language ${this.#opts.language}, falling back to nova-2-general`, + ); + this.#opts.model = 'nova-2-general'; + } + } + + stream(): stt.SpeechStream { + return new SpeechStream(this.#opts); + } +} + +export class SpeechStream extends stt.SpeechStream { + #opts: STTOptions; + #audioEnergyFilter: AudioEnergyFilter; + #logger = log(); + #speaking = false; + + constructor(opts: STTOptions) { + super(); + this.#opts = opts; + this.closed = false; + this.#audioEnergyFilter = new AudioEnergyFilter(); + + this.#run(); + } + + async #run(maxRetry = 32) { + let retries = 0; + let ws: WebSocket; + while (!this.input.closed) { + const streamURL = new URL(API_BASE_URL_V1); + const params = { + model: this.#opts.model, + punctuate: this.#opts.punctuate, + smart_format: this.#opts.smartFormat, + no_delay: this.#opts.noDelay, + interim_results: this.#opts.interimResults, + encoding: 'linear16', + vad_events: true, + sample_rate: this.#opts.sampleRate, + channels: this.#opts.numChannels, + endpointing: this.#opts.endpointing || false, + filler_words: this.#opts.fillerWords, + keywords: this.#opts.keywords.map((x) => x.join(':')), + profanity_filter: this.#opts.profanityFilter, + language: this.#opts.language, + }; + Object.entries(params).forEach(([k, v]) => { + if (v !== undefined) { + if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { + streamURL.searchParams.append(k, encodeURIComponent(v)); + } else { + v.forEach((x) => streamURL.searchParams.append('keywords', encodeURIComponent(x))); + } + } + }); + + ws = new WebSocket(streamURL, { + headers: { Authorization: `Token ${this.#opts.apiKey}` }, + }); + + try { + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', (error) => reject(error)); + ws.on('close', (code) => reject(`WebSocket returned ${code}`)); + }); + + await this.#runWS(ws); + } catch (e) { + if (retries >= maxRetry) { + throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); + } + + const delay = Math.min(retries * 5, 10); + retries++; + + this.#logger.warn( + `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, + ); + await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + } + } + + this.closed = true; + } + + async #runWS(ws: WebSocket) { + let closing = false; + + const keepalive = setInterval(() => { + try { + ws.send(JSON.stringify({ type: 'KeepAlive' })); + } catch { + clearInterval(keepalive); + return; + } + }, 5000); + + const sendTask = async () => { + const samples100Ms = Math.floor(this.#opts.sampleRate / 10); + const stream = new AudioByteStream( + this.#opts.sampleRate, + this.#opts.numChannels, + samples100Ms, + ); + + for await (const data of this.input) { + let frames: AudioFrame[]; + if (data === SpeechStream.FLUSH_SENTINEL) { + frames = stream.flush(); + } else if ( + data.sampleRate === this.#opts.sampleRate || + data.channels === this.#opts.numChannels + ) { + frames = stream.write(data.data.buffer); + } else { + throw new Error(`sample rate or channel count of frame does not match`); + } + + for await (const frame of frames) { + if (this.#audioEnergyFilter.pushFrame(frame)) { + ws.send(frame.data.buffer); + } + } + } + + closing = true; + ws.send(JSON.stringify({ type: 'CloseStream' })); + }; + + const listenTask = async () => { + new Promise((_, reject) => + ws.once('close', (code, reason) => { + if (!closing) { + this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); + reject(); + } + }), + ); + + while (!this.closed) { + try { + await new Promise((resolve) => { + ws.once('message', (data) => resolve(data)); + }).then((msg) => { + const json = JSON.parse(msg.toString()); + switch (json['type']) { + case 'SpeechStarted': { + // This is a normal case. Deepgram's SpeechStarted events + // are not correlated with speech_final or utterance end. + // It's possible that we receive two in a row without an endpoint + // It's also possible we receive a transcript without a SpeechStarted event. + if (this.#speaking) return; + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH, alternatives: [] }); + break; + } + // see this page: + // https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final + // for more information about the different types of events + case 'Results': { + const isFinal = json['is_final']; + const isEndpoint = json['speech_final']; + + const alternatives = liveTranscriptionToSpeechData(this.#opts.language!, json); + + // If, for some reason, we didn't get a SpeechStarted event but we got + // a transcript with text, we should start speaking. It's rare but has + // been observed. + if (alternatives.length > 0 && alternatives[0].text) { + if (!this.#speaking) { + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH, alternatives: [] }); + } + + if (isFinal) { + this.queue.put({ type: stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives }); + } else { + this.queue.put({ type: stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives }); + } + } + + // if we receive an endpoint, only end the speech if + // we either had a SpeechStarted event or we have a seen + // a non-empty transcript (deepgram doesn't have a SpeechEnded event) + if (isEndpoint && this.#speaking) { + this.#speaking = false; + this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH, alternatives: [] }); + } + + break; + } + case 'Metadata': { + break; + } + default: { + this.#logger.child({ msg: json }).warn('received unexpected message from Deepgram'); + break; + } + } + }); + } catch (error) { + this.#logger.child({ error }).warn('unrecoverable error, exiting'); + break; + } + } + }; + + await Promise.all([sendTask(), listenTask()]); + clearInterval(keepalive); + } +} + +const liveTranscriptionToSpeechData = ( + language: STTLanguages | string, + data: { [id: string]: any }, +): stt.SpeechData[] => { + const alts: any[] = data['channel']['alternatives']; + + return alts.map((alt) => ({ + language, + startTime: alt['words'].length ? alt['words'][0]['start'] : 0, + endTime: alt['words'].length ? alt['words'][alt['words'].length - 1]['end'] : 0, + confidence: alt['confidence'], + text: alt['transcript'], + })); +}; diff --git a/plugins/deepgram/tsconfig.json b/plugins/deepgram/tsconfig.json new file mode 100644 index 0000000..3fa21d8 --- /dev/null +++ b/plugins/deepgram/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + // match output dir to input dir. e.g. dist/index instead of dist/src/index + "rootDir": "./src", + "declarationDir": "./dist", + "outDir": "./dist" + }, + "typedocOptions": { + "name": "plugins/agents-plugin-deepgram", + "entryPointStrategy": "resolve", + "readme": "none", + "entryPoints": ["src/index.ts"] + } +} diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 4ab3e99..f03997f 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -65,8 +65,10 @@ const defaultTTSOptions: TTSOptions = { export class TTS extends tts.TTS { #opts: TTSOptions; - constructor(opts = defaultTTSOptions) { - super(sampleRateFromFormat(opts.encoding), 1, { streaming: true }); + constructor(opts: Partial = defaultTTSOptions) { + super(sampleRateFromFormat(opts.encoding || defaultTTSOptions.encoding), 1, { + streaming: true, + }); if (opts.apiKey === undefined) { throw new Error( 'ElevenLabs API key is required, whether as an argument or as $ELEVEN_API_KEY', @@ -105,7 +107,6 @@ export class TTS extends tts.TTS { } export class SynthesizeStream extends tts.SynthesizeStream { - closed: boolean; #opts: TTSOptions; #logger = log(); readonly streamURL: URL; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7f8a82c..c1525d6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -120,6 +120,9 @@ importers: '@livekit/agents': specifier: workspace:* version: link:../agents + '@livekit/agents-plugin-deepgram': + specifier: workspace:* + version: link:../plugins/deepgram '@livekit/agents-plugin-elevenlabs': specifier: workspace:* version: link:../plugins/elevenlabs @@ -137,6 +140,28 @@ importers: specifier: ^5.0.0 version: 5.4.5 + plugins/deepgram: + dependencies: + '@livekit/agents': + specifier: workspace:* + version: link:../../agents + '@livekit/rtc-node': + specifier: ^0.10.2 + version: 0.10.2 + ws: + specifier: ^8.16.0 + version: 8.17.0 + devDependencies: + '@microsoft/api-extractor': + specifier: ^7.35.0 + version: 7.43.7(@types/node@22.5.5) + '@types/ws': + specifier: ^8.5.10 + version: 8.5.10 + typescript: + specifier: ^5.0.0 + version: 5.4.5 + plugins/elevenlabs: dependencies: '@livekit/agents': diff --git a/turbo.json b/turbo.json index 1ebe97c..3cef656 100644 --- a/turbo.json +++ b/turbo.json @@ -1,6 +1,7 @@ { "$schema": "https://turborepo.org/schema.json", "globalEnv": [ + "DEEPGRAM_API_KEY", "ELEVEN_API_KEY", "OPENAI_API_KEY", "LIVEKIT_URL",