diff --git a/.changeset/cyan-mayflies-design.md b/.changeset/cyan-mayflies-design.md new file mode 100644 index 0000000000..9a689b325b --- /dev/null +++ b/.changeset/cyan-mayflies-design.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Handle e2ee worker messages sequentially diff --git a/src/e2ee/E2eeManager.ts b/src/e2ee/E2eeManager.ts index 5f3923e4fa..4dd598ca6a 100644 --- a/src/e2ee/E2eeManager.ts +++ b/src/e2ee/E2eeManager.ts @@ -115,6 +115,11 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter { + this.postKey(keyInfo); + }); + } if ( this.encryptionEnabled !== data.enabled && data.participantIdentity === this.room?.localParticipant.identity @@ -134,11 +139,6 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter { - this.postKey(keyInfo); - }); - } break; case 'ratchetKey': this.keyProvider.emit(KeyProviderEvent.KeyRatcheted, data.material, data.keyIndex); @@ -196,13 +196,13 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter { + this.postKey(keyInfo); + }); this.setParticipantCryptorEnabled( this.room.localParticipant.isE2EEEnabled, this.room.localParticipant.identity, ); - keyProvider.getKeys().forEach((keyInfo) => { - this.postKey(keyInfo); - }); }); room.localParticipant.on(ParticipantEvent.LocalTrackPublished, async (publication) => { this.setupE2EESender(publication.track!, publication.track!.sender!); diff --git a/src/e2ee/worker/FrameCryptor.ts b/src/e2ee/worker/FrameCryptor.ts index 7d44dd8593..fdae1c63bd 100644 --- a/src/e2ee/worker/FrameCryptor.ts +++ b/src/e2ee/worker/FrameCryptor.ts @@ -233,11 +233,17 @@ export class FrameCryptor extends BaseFrameCryptor { } const keySet = this.keys.getKeySet(); if (!keySet) { - throw new TypeError( - `key set not found for ${ - this.participantIdentity - } at index ${this.keys.getCurrentKeyIndex()}`, + this.emit( + CryptorEvent.Error, + new CryptorError( + `key set not found for ${ + this.participantIdentity + } at index ${this.keys.getCurrentKeyIndex()}`, + CryptorErrorReason.MissingKey, + this.participantIdentity, + ), ); + return controller.enqueue(encodedFrame); } const { encryptionKey } = keySet; const keyIndex = this.keys.getCurrentKeyIndex(); diff --git a/src/e2ee/worker/e2ee.worker.ts b/src/e2ee/worker/e2ee.worker.ts index 9fd5888c4e..a1a5232b9d 100644 --- a/src/e2ee/worker/e2ee.worker.ts +++ b/src/e2ee/worker/e2ee.worker.ts @@ -1,5 +1,6 @@ import { workerLogger } from '../../logger'; import { VideoCodec } from '../../room/track/options'; +import { AsyncQueue } from '../../utils/AsyncQueue'; import { KEY_PROVIDER_DEFAULTS } from '../constants'; import { CryptorErrorReason } from '../errors'; import { CryptorEvent, KeyHandlerEvent } from '../events'; @@ -17,6 +18,7 @@ import { ParticipantKeyHandler } from './ParticipantKeyHandler'; const participantCryptors: FrameCryptor[] = []; const participantKeys: Map = new Map(); let sharedKeyHandler: ParticipantKeyHandler | undefined; +let messageQueue = new AsyncQueue(); let isEncryptionEnabled: boolean = false; @@ -31,85 +33,87 @@ let rtpMap: Map = new Map(); workerLogger.setDefaultLevel('info'); onmessage = (ev) => { - const { kind, data }: E2EEWorkerMessage = ev.data; + messageQueue.run(async () => { + const { kind, data }: E2EEWorkerMessage = ev.data; - switch (kind) { - case 'init': - workerLogger.setLevel(data.loglevel); - workerLogger.info('worker initialized'); - keyProviderOptions = data.keyProviderOptions; - useSharedKey = !!data.keyProviderOptions.sharedKey; - // acknowledge init successful - const ackMsg: InitAck = { - kind: 'initAck', - data: { enabled: isEncryptionEnabled }, - }; - postMessage(ackMsg); - break; - case 'enable': - setEncryptionEnabled(data.enabled, data.participantIdentity); - workerLogger.info( - `updated e2ee enabled status for ${data.participantIdentity} to ${data.enabled}`, - ); - // acknowledge enable call successful - postMessage(ev.data); - break; - case 'decode': - let cryptor = getTrackCryptor(data.participantIdentity, data.trackId); - cryptor.setupTransform( - kind, - data.readableStream, - data.writableStream, - data.trackId, - data.codec, - ); - break; - case 'encode': - let pubCryptor = getTrackCryptor(data.participantIdentity, data.trackId); - pubCryptor.setupTransform( - kind, - data.readableStream, - data.writableStream, - data.trackId, - data.codec, - ); - break; - case 'setKey': - if (useSharedKey) { - setSharedKey(data.key, data.keyIndex); - } else if (data.participantIdentity) { + switch (kind) { + case 'init': + workerLogger.setLevel(data.loglevel); + workerLogger.info('worker initialized'); + keyProviderOptions = data.keyProviderOptions; + useSharedKey = !!data.keyProviderOptions.sharedKey; + // acknowledge init successful + const ackMsg: InitAck = { + kind: 'initAck', + data: { enabled: isEncryptionEnabled }, + }; + postMessage(ackMsg); + break; + case 'enable': + setEncryptionEnabled(data.enabled, data.participantIdentity); workerLogger.info( - `set participant sender key ${data.participantIdentity} index ${data.keyIndex}`, + `updated e2ee enabled status for ${data.participantIdentity} to ${data.enabled}`, ); - getParticipantKeyHandler(data.participantIdentity).setKey(data.key, data.keyIndex); - } else { - workerLogger.error('no participant Id was provided and shared key usage is disabled'); - } - break; - case 'removeTransform': - unsetCryptorParticipant(data.trackId, data.participantIdentity); - break; - case 'updateCodec': - getTrackCryptor(data.participantIdentity, data.trackId).setVideoCodec(data.codec); - break; - case 'setRTPMap': - // this is only used for the local participant - rtpMap = data.map; - participantCryptors.forEach((cr) => { - if (cr.getParticipantIdentity() === data.participantIdentity) { - cr.setRtpMap(data.map); + // acknowledge enable call successful + postMessage(ev.data); + break; + case 'decode': + let cryptor = getTrackCryptor(data.participantIdentity, data.trackId); + cryptor.setupTransform( + kind, + data.readableStream, + data.writableStream, + data.trackId, + data.codec, + ); + break; + case 'encode': + let pubCryptor = getTrackCryptor(data.participantIdentity, data.trackId); + pubCryptor.setupTransform( + kind, + data.readableStream, + data.writableStream, + data.trackId, + data.codec, + ); + break; + case 'setKey': + if (useSharedKey) { + await setSharedKey(data.key, data.keyIndex); + } else if (data.participantIdentity) { + workerLogger.info( + `set participant sender key ${data.participantIdentity} index ${data.keyIndex}`, + ); + await getParticipantKeyHandler(data.participantIdentity).setKey(data.key, data.keyIndex); + } else { + workerLogger.error('no participant Id was provided and shared key usage is disabled'); } - }); - break; - case 'ratchetRequest': - handleRatchetRequest(data); - break; - case 'setSifTrailer': - handleSifTrailer(data.trailer); - break; - default: - break; - } + break; + case 'removeTransform': + unsetCryptorParticipant(data.trackId, data.participantIdentity); + break; + case 'updateCodec': + getTrackCryptor(data.participantIdentity, data.trackId).setVideoCodec(data.codec); + break; + case 'setRTPMap': + // this is only used for the local participant + rtpMap = data.map; + participantCryptors.forEach((cr) => { + if (cr.getParticipantIdentity() === data.participantIdentity) { + cr.setRtpMap(data.map); + } + }); + break; + case 'ratchetRequest': + handleRatchetRequest(data); + break; + case 'setSifTrailer': + handleSifTrailer(data.trailer); + break; + default: + break; + } + }); }; async function handleRatchetRequest(data: RatchetRequestMessage['data']) { @@ -210,9 +214,9 @@ function setEncryptionEnabled(enable: boolean, participantIdentity: string) { encryptionEnabledMap.set(participantIdentity, enable); } -function setSharedKey(key: CryptoKey, index?: number) { +async function setSharedKey(key: CryptoKey, index?: number) { workerLogger.info('set shared key', { index }); - getSharedKeyHandler().setKey(key, index); + await getSharedKeyHandler().setKey(key, index); } function setupCryptorErrorEvents(cryptor: FrameCryptor) {