From 47071709c225ba0aaf5fff95b97a3e1b96f7d066 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Sun, 20 Oct 2024 16:30:10 +0300 Subject: [PATCH 1/7] feat(deepgram): init Deepgram STT --- plugins/deepgram/api-extractor.json | 20 +++++++++ plugins/deepgram/package.json | 27 +++++++++++ plugins/deepgram/src/index.ts | 5 +++ plugins/deepgram/src/models.ts | 70 +++++++++++++++++++++++++++++ plugins/deepgram/tsconfig.json | 16 +++++++ 5 files changed, 138 insertions(+) create mode 100644 plugins/deepgram/api-extractor.json create mode 100644 plugins/deepgram/package.json create mode 100644 plugins/deepgram/src/index.ts create mode 100644 plugins/deepgram/src/models.ts create mode 100644 plugins/deepgram/tsconfig.json diff --git a/plugins/deepgram/api-extractor.json b/plugins/deepgram/api-extractor.json new file mode 100644 index 0000000..1f75e07 --- /dev/null +++ b/plugins/deepgram/api-extractor.json @@ -0,0 +1,20 @@ +/** + * Config file for API Extractor. For more info, please visit: https://api-extractor.com + */ +{ + "$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", + + /** + * Optionally specifies another JSON config file that this file extends from. This provides a way for + * standard settings to be shared across multiple projects. + * + * If the path starts with "./" or "../", the path is resolved relative to the folder of the file that contains + * the "extends" field. Otherwise, the first path segment is interpreted as an NPM package name, and will be + * resolved using NodeJS require(). + * + * SUPPORTED TOKENS: none + * DEFAULT VALUE: "" + */ + "extends": "../../api-extractor-shared.json", + "mainEntryPointFilePath": "./dist/index.d.ts" +} diff --git a/plugins/deepgram/package.json b/plugins/deepgram/package.json new file mode 100644 index 0000000..a580d8a --- /dev/null +++ b/plugins/deepgram/package.json @@ -0,0 +1,27 @@ +{ + "name": "@livekit/agents-plugin-deepgram", + "version": "0.0.0", + "description": "Deepgram plugin for LiveKit Agents for Node.js", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "author": "LiveKit", + "type": "module", + "scripts": { + "build": "tsc", + "clean": "rm -rf dist", + "clean:build": "pnpm clean && pnpm build", + "lint": "eslint -f unix \"src/**/*.{ts,js}\"", + "api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript", + "api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose" + }, + "devDependencies": { + "@microsoft/api-extractor": "^7.35.0", + "@types/ws": "^8.5.10", + "typescript": "^5.0.0" + }, + "dependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "^0.10.2", + "ws": "^8.16.0" + } +} diff --git a/plugins/deepgram/src/index.ts b/plugins/deepgram/src/index.ts new file mode 100644 index 0000000..a649b5e --- /dev/null +++ b/plugins/deepgram/src/index.ts @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export * from './stt.js'; diff --git a/plugins/deepgram/src/models.ts b/plugins/deepgram/src/models.ts new file mode 100644 index 0000000..63b60b0 --- /dev/null +++ b/plugins/deepgram/src/models.ts @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export type STTModels = + | 'nova-general' + | 'nova-phonecall' + | 'nova-meeting' + | 'nova-2-general' + | 'nova-2-meeting' + | 'nova-2-phonecall' + | 'nova-2-finance' + | 'nova-2-conversationalai' + | 'nova-2-voicemail' + | 'nova-2-video' + | 'nova-2-medical' + | 'nova-2-drivethru' + | 'nova-2-automotive' + | 'enhanced-general' + | 'enhanced-meeting' + | 'enhanced-phonecall' + | 'enhanced-finance' + | 'base' + | 'meeting' + | 'phonecall' + | 'finance' + | 'conversationalai' + | 'voicemail' + | 'video' + | 'whisper-tiny' + | 'whisper-base' + | 'whisper-small' + | 'whisper-medium' + | 'whisper-large'; + +export type STTLanguages = + | 'da' + | 'de' + | 'en' + | 'en-AU' + | 'en-GB' + | 'en-IN' + | 'en-NZ' + | 'en-US' + | 'es' + | 'es-419' + | 'es-LATAM' + | 'fr' + | 'fr-CA' + | 'hi' + | 'hi-Latn' + | 'id' + | 'it' + | 'ja' + | 'ko' + | 'nl' + | 'no' + | 'pl' + | 'pt' + | 'pt-BR' + | 'ru' + | 'sv' + | 'ta' + | 'taq' + | 'th' + | 'tr' + | 'uk' + | 'zh' + | 'zh-CN' + | 'zh-TW'; diff --git a/plugins/deepgram/tsconfig.json b/plugins/deepgram/tsconfig.json new file mode 100644 index 0000000..3fa21d8 --- /dev/null +++ b/plugins/deepgram/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + // match output dir to input dir. e.g. dist/index instead of dist/src/index + "rootDir": "./src", + "declarationDir": "./dist", + "outDir": "./dist" + }, + "typedocOptions": { + "name": "plugins/agents-plugin-deepgram", + "entryPointStrategy": "resolve", + "readme": "none", + "entryPoints": ["src/index.ts"] + } +} From 2ecae010a23f8e69300a3cf1d317f20778c0367a Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 23 Oct 2024 21:29:04 +0300 Subject: [PATCH 2/7] add STT functionality --- agents/src/utils.ts | 29 ++++ examples/package.json | 1 + examples/src/stt.ts | 44 +++++ plugins/deepgram/src/stt.ts | 299 ++++++++++++++++++++++++++++++++++ plugins/elevenlabs/src/tts.ts | 7 +- pnpm-lock.yaml | 25 +++ turbo.json | 1 + 7 files changed, 403 insertions(+), 3 deletions(-) create mode 100644 examples/src/stt.ts create mode 100644 plugins/deepgram/src/stt.ts diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 6110ba1..3ef8b2e 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -297,6 +297,7 @@ export class AsyncIterableQueue implements AsyncIterableIterator { } } +/** @internal */ export class ExpFilter { #alpha: number; #max?: number; @@ -337,3 +338,31 @@ export class ExpFilter { this.#alpha = alpha; } } + +/** @internal */ +export class AudioEnergyFilter { + #cooldownSeconds: number; + #cooldown: number; + + constructor(cooldownSeconds = 1) { + this.#cooldownSeconds = cooldownSeconds; + this.#cooldown = cooldownSeconds; + } + + pushFrame(frame: AudioFrame): boolean { + const arr = Float32Array.from(frame.data, (x) => x / 32768); + const rms = (arr.map((x) => x ** 2).reduce((acc, x) => acc + x) / arr.length) ** 0.5; + if (rms > 0.004) { + this.#cooldown = this.#cooldownSeconds; + return true; + } + + const durationSeconds = frame.samplesPerChannel / frame.sampleRate; + this.#cooldown -= durationSeconds; + if (this.#cooldown > 0) { + return true; + } + + return false; + } +} diff --git a/examples/package.json b/examples/package.json index 52b1329..22f00ca 100644 --- a/examples/package.json +++ b/examples/package.json @@ -14,6 +14,7 @@ }, "dependencies": { "@livekit/agents": "workspace:*", + "@livekit/agents-plugin-deepgram": "workspace:*", "@livekit/agents-plugin-elevenlabs": "workspace:*", "@livekit/agents-plugin-openai": "workspace:*", "@livekit/rtc-node": "^0.10.2", diff --git a/examples/src/stt.ts b/examples/src/stt.ts new file mode 100644 index 0000000..b1f1317 --- /dev/null +++ b/examples/src/stt.ts @@ -0,0 +1,44 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { type JobContext, WorkerOptions, cli, defineAgent, stt } from '@livekit/agents'; +import { STT } from '@livekit/agents-plugin-deepgram'; +import type { Track } from '@livekit/rtc-node'; +import { AudioStream, RoomEvent, TrackKind } from '@livekit/rtc-node'; +import { fileURLToPath } from 'node:url'; + +export default defineAgent({ + entry: async (ctx: JobContext) => { + await ctx.connect(); + console.log('starting TTS example agent'); + + const transcribeTrack = async (track: Track) => { + const audioStream = new AudioStream(track); + const sttStream = new STT().stream(); + + const sendTask = async () => { + for await (const event of audioStream) { + sttStream.pushFrame(event); + } + }; + + const recvTask = async () => { + for await (const event of sttStream) { + if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT) { + console.log(event.alternatives[0].text); + } + } + }; + + Promise.all([sendTask(), recvTask()]); + }; + + ctx.room.on(RoomEvent.TrackSubscribed, async (track: Track) => { + if (track.kind === TrackKind.KIND_AUDIO) { + transcribeTrack(track); + } + }); + }, +}); + +cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts new file mode 100644 index 0000000..bff623a --- /dev/null +++ b/plugins/deepgram/src/stt.ts @@ -0,0 +1,299 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { AudioByteStream, AudioEnergyFilter, log, stt } from '@livekit/agents'; +import type { SpeechData } from '@livekit/agents/dist/stt/stt.js'; +import { SpeechEventType } from '@livekit/agents/dist/stt/stt.js'; +import type { AudioFrame } from '@livekit/rtc-node'; +import type { RawData } from 'ws'; +import { WebSocket } from 'ws'; +import type { STTLanguages, STTModels } from './models.js'; + +const API_BASE_URL_V1 = 'wss://api.deepgram.com/v1/listen'; + +export interface STTOptions { + apiKey?: string; + language?: STTLanguages | string; + detectLanguage: boolean; + interimResults: boolean; + punctuate: boolean; + model: STTModels; + smartFormat: boolean; + noDelay: boolean; + endpointing: number; + fillerWords: boolean; + sampleRate: number; + numChannels: number; + keywords: [string, number][]; + profanityFilter: boolean; +} + +const defaultSTTOptions: STTOptions = { + apiKey: process.env.DEEPGRAM_API_KEY, + language: 'en-US', + detectLanguage: false, + interimResults: true, + punctuate: true, + model: 'nova-2-general', + smartFormat: true, + noDelay: true, + endpointing: 25, + fillerWords: false, + sampleRate: 16000, + numChannels: 1, + keywords: [], + profanityFilter: false, +}; + +export class STT extends stt.STT { + #opts: STTOptions; + #logger = log(); + + constructor(opts: Partial = defaultSTTOptions) { + super({ + streaming: true, + interimResults: opts.interimResults || defaultSTTOptions.interimResults, + }); + if (opts.apiKey === undefined) { + throw new Error( + 'Deepgram API key is required, whether as an argument or as $DEEPGRAM_API_KEY', + ); + } + + this.#opts = { ...defaultSTTOptions, ...opts }; + + if (this.#opts.detectLanguage) { + this.#opts.language = undefined; + } else if ( + this.#opts.language && + !['en-US', 'en'].includes(this.#opts.language) && + [ + 'nova-2-meeting', + 'nova-2-phonecall', + 'nova-2-finance', + 'nova-2-conversationalai', + 'nova-2-voicemail', + 'nova-2-video', + 'nova-2-medical', + 'nova-2-drivethru', + 'nova-2-automotive', + ].includes(this.#opts.model) + ) { + this.#logger.warn( + `${this.#opts.model} does not support language ${this.#opts.language}, falling back to nova-2-general`, + ); + this.#opts.model = 'nova-2-general'; + } + } + + stream(): stt.SpeechStream { + return new SpeechStream(this.#opts); + } +} + +export class SpeechStream extends stt.SpeechStream { + #opts: STTOptions; + #audioEnergyFilter: AudioEnergyFilter; + #logger = log(); + #speaking = false; + + constructor(opts: STTOptions) { + super(); + this.#opts = opts; + this.closed = false; + this.#audioEnergyFilter = new AudioEnergyFilter(); + + this.#run(); + } + + async #run(maxRetry = 32) { + let retries = 0; + let ws: WebSocket; + while (!this.input.closed) { + const streamURL = new URL(API_BASE_URL_V1); + const params = { + model: this.#opts.model, + punctuate: this.#opts.punctuate, + smart_format: this.#opts.smartFormat, + no_delay: this.#opts.noDelay, + interim_results: this.#opts.interimResults, + encoding: 'linear16', + vad_events: true, + sample_rate: this.#opts.sampleRate, + channels: this.#opts.numChannels, + endpointing: this.#opts.endpointing || false, + filler_words: this.#opts.fillerWords, + keywords: this.#opts.keywords.map((x) => x.join(':')).join('&keywords='), + profanity_filter: this.#opts.profanityFilter, + language: this.#opts.language, + }; + Object.entries(params).forEach(([k, v]) => { + if (v !== undefined) { + streamURL.searchParams.append(k, encodeURIComponent(v)); + } + }); + + ws = new WebSocket(streamURL, { + headers: { Authorization: `Token ${this.#opts.apiKey}` }, + }); + + try { + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', (error) => reject(error)); + ws.on('close', (code) => reject(`WebSocket returned ${code}`)); + }); + + await this.#runWS(ws); + } catch (e) { + if (retries >= maxRetry) { + throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); + } + + const delay = Math.min(retries * 5, 10); + retries++; + + this.#logger.warn( + `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, + ); + await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + } + } + + this.closed = true; + } + + async #runWS(ws: WebSocket) { + let closing = false; + + const keepalive = setInterval(() => { + try { + ws.send(JSON.stringify({ type: 'KeepAlive' })); + } catch { + clearInterval(keepalive); + return; + } + }, 5000); + + const sendTask = async () => { + const samples100Ms = Math.trunc(this.#opts.sampleRate / 10); + const stream = new AudioByteStream( + this.#opts.sampleRate, + this.#opts.numChannels, + samples100Ms, + ); + + for await (const data of this.input) { + let frames: AudioFrame[]; + if (data === SpeechStream.FLUSH_SENTINEL) { + frames = stream.flush(); + } else { + frames = stream.write(data.data.buffer); + } + + for await (const frame of frames) { + if (this.#audioEnergyFilter.pushFrame(frame)) { + // DEBUG(nbsp): this line might not send the buffer correctly + ws.send(frame.data.buffer); + } + } + } + + closing = true; + ws.send(JSON.stringify({ type: 'CloseStream' })); + }; + + const listenTask = async () => { + while (!this.closed) { + try { + await new Promise((resolve, reject) => { + ws.on('message', (data) => resolve(data)); + ws.on('close', (code, reason) => { + if (!closing) { + reject(`WebSocket closed with code ${code}: ${reason}`); + } + }); + // DEBUG(nbsp) + ws.on('error', this.#logger.error); + }).then((msg) => { + const json = JSON.parse(msg.toString()); + switch (json['type']) { + case 'SpeechStarted': { + // This is a normal case. Deepgram's SpeechStarted events + // are not correlated with speech_final or utterance end. + // It's possible that we receive two in a row without an endpoint + // It's also possible we receive a transcript without a SpeechStarted event. + if (this.#speaking) return; + this.#speaking = true; + this.queue.put({ type: SpeechEventType.START_OF_SPEECH, alternatives: [] }); + break; + } + // see this page: + // https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final + // for more information about the different types of events + case 'Results': { + const isFinal = json['is_final']; + const isEndpoint = json['speech_final']; + + const alternatives = liveTranscriptionToSpeechData(this.#opts.language!, json); + + // If, for some reason, we didn't get a SpeechStarted event but we got + // a transcript with text, we should start speaking. It's rare but has + // been observed. + if (alternatives.length > 0 && alternatives[0].text) { + if (!this.#speaking) { + this.#speaking = true; + this.queue.put({ type: SpeechEventType.START_OF_SPEECH, alternatives: [] }); + } + + if (isFinal) { + this.queue.put({ type: SpeechEventType.FINAL_TRANSCRIPT, alternatives }); + } else { + this.queue.put({ type: SpeechEventType.INTERIM_TRANSCRIPT, alternatives }); + } + } + + // if we receive an endpoint, only end the speech if + // we either had a SpeechStarted event or we have a seen + // a non-empty transcript (deepgram doesn't have a SpeechEnded event) + if (isEndpoint && this.#speaking) { + this.#speaking = false; + this.queue.put({ type: SpeechEventType.END_OF_SPEECH, alternatives: [] }); + } + + break; + } + case 'Metadata': { + break; + } + default: { + this.#logger.child({ msg: json }).warn('received unexpected message from Deepgram'); + break; + } + } + }); + } catch { + break; + } + } + }; + + await Promise.all([sendTask(), listenTask()]); + clearInterval(keepalive); + } +} + +const liveTranscriptionToSpeechData = ( + language: STTLanguages | string, + data: { [id: string]: any }, +): SpeechData[] => { + const alts: any[] = data['channel']['alternatives']; + + return alts.map((alt) => ({ + language, + startTime: alt['words'] ? alt['words'][0]['start'] : 0, + endTime: alt['words'] ? alt['words'][alt['words'].length - 1]['end'] : 0, + confidence: alt['confidence'], + text: alt['transcript'], + })); +}; diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 4ab3e99..f03997f 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -65,8 +65,10 @@ const defaultTTSOptions: TTSOptions = { export class TTS extends tts.TTS { #opts: TTSOptions; - constructor(opts = defaultTTSOptions) { - super(sampleRateFromFormat(opts.encoding), 1, { streaming: true }); + constructor(opts: Partial = defaultTTSOptions) { + super(sampleRateFromFormat(opts.encoding || defaultTTSOptions.encoding), 1, { + streaming: true, + }); if (opts.apiKey === undefined) { throw new Error( 'ElevenLabs API key is required, whether as an argument or as $ELEVEN_API_KEY', @@ -105,7 +107,6 @@ export class TTS extends tts.TTS { } export class SynthesizeStream extends tts.SynthesizeStream { - closed: boolean; #opts: TTSOptions; #logger = log(); readonly streamURL: URL; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7f8a82c..c1525d6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -120,6 +120,9 @@ importers: '@livekit/agents': specifier: workspace:* version: link:../agents + '@livekit/agents-plugin-deepgram': + specifier: workspace:* + version: link:../plugins/deepgram '@livekit/agents-plugin-elevenlabs': specifier: workspace:* version: link:../plugins/elevenlabs @@ -137,6 +140,28 @@ importers: specifier: ^5.0.0 version: 5.4.5 + plugins/deepgram: + dependencies: + '@livekit/agents': + specifier: workspace:* + version: link:../../agents + '@livekit/rtc-node': + specifier: ^0.10.2 + version: 0.10.2 + ws: + specifier: ^8.16.0 + version: 8.17.0 + devDependencies: + '@microsoft/api-extractor': + specifier: ^7.35.0 + version: 7.43.7(@types/node@22.5.5) + '@types/ws': + specifier: ^8.5.10 + version: 8.5.10 + typescript: + specifier: ^5.0.0 + version: 5.4.5 + plugins/elevenlabs: dependencies: '@livekit/agents': diff --git a/turbo.json b/turbo.json index 1ebe97c..3cef656 100644 --- a/turbo.json +++ b/turbo.json @@ -1,6 +1,7 @@ { "$schema": "https://turborepo.org/schema.json", "globalEnv": [ + "DEEPGRAM_API_KEY", "ELEVEN_API_KEY", "OPENAI_API_KEY", "LIVEKIT_URL", From 1104d0ede3e78b2b6a4e3226b3b11983ace1f1d7 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 23 Oct 2024 21:54:23 +0300 Subject: [PATCH 3/7] Create friendly-wombats-help.md --- .changeset/friendly-wombats-help.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/friendly-wombats-help.md diff --git a/.changeset/friendly-wombats-help.md b/.changeset/friendly-wombats-help.md new file mode 100644 index 0000000..1d0b9d3 --- /dev/null +++ b/.changeset/friendly-wombats-help.md @@ -0,0 +1,6 @@ +--- +"@livekit/agents": patch +"@livekit/agents-plugin-deepgram": minor +--- + +add Deepgram text-to-speech plugin From d1f3db9341002884372bf55f411d635c027ca7e4 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Fri, 25 Oct 2024 02:37:16 +0300 Subject: [PATCH 4/7] properly import SpeechData and SpeechEventType --- plugins/deepgram/src/stt.ts | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index bff623a..65718d0 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -2,11 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 import { AudioByteStream, AudioEnergyFilter, log, stt } from '@livekit/agents'; -import type { SpeechData } from '@livekit/agents/dist/stt/stt.js'; -import { SpeechEventType } from '@livekit/agents/dist/stt/stt.js'; import type { AudioFrame } from '@livekit/rtc-node'; -import type { RawData } from 'ws'; -import { WebSocket } from 'ws'; +import { type RawData, WebSocket } from 'ws'; import type { STTLanguages, STTModels } from './models.js'; const API_BASE_URL_V1 = 'wss://api.deepgram.com/v1/listen'; @@ -225,7 +222,7 @@ export class SpeechStream extends stt.SpeechStream { // It's also possible we receive a transcript without a SpeechStarted event. if (this.#speaking) return; this.#speaking = true; - this.queue.put({ type: SpeechEventType.START_OF_SPEECH, alternatives: [] }); + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH, alternatives: [] }); break; } // see this page: @@ -243,13 +240,13 @@ export class SpeechStream extends stt.SpeechStream { if (alternatives.length > 0 && alternatives[0].text) { if (!this.#speaking) { this.#speaking = true; - this.queue.put({ type: SpeechEventType.START_OF_SPEECH, alternatives: [] }); + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH, alternatives: [] }); } if (isFinal) { - this.queue.put({ type: SpeechEventType.FINAL_TRANSCRIPT, alternatives }); + this.queue.put({ type: stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives }); } else { - this.queue.put({ type: SpeechEventType.INTERIM_TRANSCRIPT, alternatives }); + this.queue.put({ type: stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives }); } } @@ -258,7 +255,7 @@ export class SpeechStream extends stt.SpeechStream { // a non-empty transcript (deepgram doesn't have a SpeechEnded event) if (isEndpoint && this.#speaking) { this.#speaking = false; - this.queue.put({ type: SpeechEventType.END_OF_SPEECH, alternatives: [] }); + this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH, alternatives: [] }); } break; @@ -286,7 +283,7 @@ export class SpeechStream extends stt.SpeechStream { const liveTranscriptionToSpeechData = ( language: STTLanguages | string, data: { [id: string]: any }, -): SpeechData[] => { +): stt.SpeechData[] => { const alts: any[] = data['channel']['alternatives']; return alts.map((alt) => ({ From 4bd4bd2af04db9e979e6a25c4e776b3f7e5cb3f4 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Fri, 25 Oct 2024 04:46:36 +0300 Subject: [PATCH 5/7] fix response parser --- examples/src/stt.ts | 2 +- plugins/deepgram/src/stt.ts | 35 ++++++++++++++++++++++------------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/examples/src/stt.ts b/examples/src/stt.ts index b1f1317..d9f9199 100644 --- a/examples/src/stt.ts +++ b/examples/src/stt.ts @@ -10,7 +10,7 @@ import { fileURLToPath } from 'node:url'; export default defineAgent({ entry: async (ctx: JobContext) => { await ctx.connect(); - console.log('starting TTS example agent'); + console.log('starting STT example agent'); const transcribeTrack = async (track: Track) => { const audioStream = new AudioStream(track); diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 65718d0..21ff80a 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -120,13 +120,17 @@ export class SpeechStream extends stt.SpeechStream { channels: this.#opts.numChannels, endpointing: this.#opts.endpointing || false, filler_words: this.#opts.fillerWords, - keywords: this.#opts.keywords.map((x) => x.join(':')).join('&keywords='), + keywords: this.#opts.keywords.map((x) => x.join(':')), profanity_filter: this.#opts.profanityFilter, language: this.#opts.language, }; Object.entries(params).forEach(([k, v]) => { if (v !== undefined) { - streamURL.searchParams.append(k, encodeURIComponent(v)); + if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { + streamURL.searchParams.append(k, encodeURIComponent(v)); + } else { + v.forEach((x) => streamURL.searchParams.append('keywords', encodeURIComponent(x))); + } } }); @@ -201,18 +205,22 @@ export class SpeechStream extends stt.SpeechStream { }; const listenTask = async () => { + new Promise((resolve, reject) => + ws.once('close', (code, reason) => { + if (!closing) { + this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); + reject(); + } + }), + ); + while (!this.closed) { try { + console.log('awaiting'); await new Promise((resolve, reject) => { - ws.on('message', (data) => resolve(data)); - ws.on('close', (code, reason) => { - if (!closing) { - reject(`WebSocket closed with code ${code}: ${reason}`); - } - }); - // DEBUG(nbsp) - ws.on('error', this.#logger.error); + ws.once('message', (data) => resolve(data)); }).then((msg) => { + console.log(msg.toString()); const json = JSON.parse(msg.toString()); switch (json['type']) { case 'SpeechStarted': { @@ -269,7 +277,8 @@ export class SpeechStream extends stt.SpeechStream { } } }); - } catch { + } catch (error) { + this.#logger.child({ error }).warn('unrecoverable error, exiting'); break; } } @@ -288,8 +297,8 @@ const liveTranscriptionToSpeechData = ( return alts.map((alt) => ({ language, - startTime: alt['words'] ? alt['words'][0]['start'] : 0, - endTime: alt['words'] ? alt['words'][alt['words'].length - 1]['end'] : 0, + startTime: alt['words'].length ? alt['words'][0]['start'] : 0, + endTime: alt['words'].length ? alt['words'][alt['words'].length - 1]['end'] : 0, confidence: alt['confidence'], text: alt['transcript'], })); From b47b4d8fd7256ab468d4bdec1edaf0d94e783551 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Fri, 25 Oct 2024 18:35:09 +0300 Subject: [PATCH 6/7] wip: fix deepgram issue --- plugins/deepgram/src/stt.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 21ff80a..568aa54 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import { AudioByteStream, AudioEnergyFilter, log, stt } from '@livekit/agents'; -import type { AudioFrame } from '@livekit/rtc-node'; +import { type AudioFrame } from '@livekit/rtc-node'; import { type RawData, WebSocket } from 'ws'; import type { STTLanguages, STTModels } from './models.js'; @@ -36,7 +36,7 @@ const defaultSTTOptions: STTOptions = { noDelay: true, endpointing: 25, fillerWords: false, - sampleRate: 16000, + sampleRate: 48000, numChannels: 1, keywords: [], profanityFilter: false, @@ -177,7 +177,7 @@ export class SpeechStream extends stt.SpeechStream { }, 5000); const sendTask = async () => { - const samples100Ms = Math.trunc(this.#opts.sampleRate / 10); + const samples100Ms = Math.floor(this.#opts.sampleRate / 10); const stream = new AudioByteStream( this.#opts.sampleRate, this.#opts.numChannels, @@ -194,7 +194,6 @@ export class SpeechStream extends stt.SpeechStream { for await (const frame of frames) { if (this.#audioEnergyFilter.pushFrame(frame)) { - // DEBUG(nbsp): this line might not send the buffer correctly ws.send(frame.data.buffer); } } @@ -205,7 +204,7 @@ export class SpeechStream extends stt.SpeechStream { }; const listenTask = async () => { - new Promise((resolve, reject) => + new Promise((_, reject) => ws.once('close', (code, reason) => { if (!closing) { this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); @@ -216,11 +215,9 @@ export class SpeechStream extends stt.SpeechStream { while (!this.closed) { try { - console.log('awaiting'); - await new Promise((resolve, reject) => { + await new Promise((resolve) => { ws.once('message', (data) => resolve(data)); }).then((msg) => { - console.log(msg.toString()); const json = JSON.parse(msg.toString()); switch (json['type']) { case 'SpeechStarted': { From 275d1c1b3d12925c264a1bfd57d43ab2141c4ba7 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Fri, 25 Oct 2024 19:19:33 +0300 Subject: [PATCH 7/7] error if frames don't match --- examples/src/stt.ts | 2 +- plugins/deepgram/src/stt.ts | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/examples/src/stt.ts b/examples/src/stt.ts index d9f9199..541a277 100644 --- a/examples/src/stt.ts +++ b/examples/src/stt.ts @@ -14,7 +14,7 @@ export default defineAgent({ const transcribeTrack = async (track: Track) => { const audioStream = new AudioStream(track); - const sttStream = new STT().stream(); + const sttStream = new STT({ sampleRate: 48000 }).stream(); const sendTask = async () => { for await (const event of audioStream) { diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 568aa54..fbe67bf 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -36,7 +36,7 @@ const defaultSTTOptions: STTOptions = { noDelay: true, endpointing: 25, fillerWords: false, - sampleRate: 48000, + sampleRate: 16000, numChannels: 1, keywords: [], profanityFilter: false, @@ -188,8 +188,13 @@ export class SpeechStream extends stt.SpeechStream { let frames: AudioFrame[]; if (data === SpeechStream.FLUSH_SENTINEL) { frames = stream.flush(); - } else { + } else if ( + data.sampleRate === this.#opts.sampleRate || + data.channels === this.#opts.numChannels + ) { frames = stream.write(data.data.buffer); + } else { + throw new Error(`sample rate or channel count of frame does not match`); } for await (const frame of frames) {