Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle e2ee worker messages sequentially #1260

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cyan-mayflies-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

Handle e2ee worker messages sequentially
16 changes: 8 additions & 8 deletions src/e2ee/E2eeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter<E2
break;

case 'enable':
if (data.enabled) {
this.keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
}
if (
this.encryptionEnabled !== data.enabled &&
data.participantIdentity === this.room?.localParticipant.identity
Expand All @@ -134,11 +139,6 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter<E2
}
this.emit(EncryptionEvent.ParticipantEncryptionStatusChanged, data.enabled, participant);
}
if (this.encryptionEnabled) {
this.keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
}
break;
case 'ratchetKey':
this.keyProvider.emit(KeyProviderEvent.KeyRatcheted, data.material, data.keyIndex);
Expand Down Expand Up @@ -196,13 +196,13 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter<E2
if (!this.room) {
throw new TypeError(`expected room to be present on signal connect`);
}
keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
this.setParticipantCryptorEnabled(
this.room.localParticipant.isE2EEEnabled,
this.room.localParticipant.identity,
);
keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
});
room.localParticipant.on(ParticipantEvent.LocalTrackPublished, async (publication) => {
this.setupE2EESender(publication.track!, publication.track!.sender!);
Expand Down
14 changes: 10 additions & 4 deletions src/e2ee/worker/FrameCryptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,17 @@ export class FrameCryptor extends BaseFrameCryptor {
}
const keySet = this.keys.getKeySet();
if (!keySet) {
throw new TypeError(
`key set not found for ${
this.participantIdentity
} at index ${this.keys.getCurrentKeyIndex()}`,
this.emit(
CryptorEvent.Error,
new CryptorError(
`key set not found for ${
this.participantIdentity
} at index ${this.keys.getCurrentKeyIndex()}`,
CryptorErrorReason.MissingKey,
this.participantIdentity,
),
);
return controller.enqueue(encodedFrame);
}
const { encryptionKey } = keySet;
const keyIndex = this.keys.getCurrentKeyIndex();
Expand Down
158 changes: 81 additions & 77 deletions src/e2ee/worker/e2ee.worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { workerLogger } from '../../logger';
import { VideoCodec } from '../../room/track/options';
import { AsyncQueue } from '../../utils/AsyncQueue';
import { KEY_PROVIDER_DEFAULTS } from '../constants';
import { CryptorErrorReason } from '../errors';
import { CryptorEvent, KeyHandlerEvent } from '../events';
Expand All @@ -17,6 +18,7 @@ import { ParticipantKeyHandler } from './ParticipantKeyHandler';
const participantCryptors: FrameCryptor[] = [];
const participantKeys: Map<string, ParticipantKeyHandler> = new Map();
let sharedKeyHandler: ParticipantKeyHandler | undefined;
let messageQueue = new AsyncQueue();

let isEncryptionEnabled: boolean = false;

Expand All @@ -31,85 +33,87 @@ let rtpMap: Map<number, VideoCodec> = new Map();
workerLogger.setDefaultLevel('info');

onmessage = (ev) => {
const { kind, data }: E2EEWorkerMessage = ev.data;
messageQueue.run(async () => {
const { kind, data }: E2EEWorkerMessage = ev.data;

switch (kind) {
case 'init':
workerLogger.setLevel(data.loglevel);
workerLogger.info('worker initialized');
keyProviderOptions = data.keyProviderOptions;
useSharedKey = !!data.keyProviderOptions.sharedKey;
// acknowledge init successful
const ackMsg: InitAck = {
kind: 'initAck',
data: { enabled: isEncryptionEnabled },
};
postMessage(ackMsg);
break;
case 'enable':
setEncryptionEnabled(data.enabled, data.participantIdentity);
workerLogger.info(
`updated e2ee enabled status for ${data.participantIdentity} to ${data.enabled}`,
);
// acknowledge enable call successful
postMessage(ev.data);
break;
case 'decode':
let cryptor = getTrackCryptor(data.participantIdentity, data.trackId);
cryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'encode':
let pubCryptor = getTrackCryptor(data.participantIdentity, data.trackId);
pubCryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'setKey':
if (useSharedKey) {
setSharedKey(data.key, data.keyIndex);
} else if (data.participantIdentity) {
switch (kind) {
case 'init':
workerLogger.setLevel(data.loglevel);
workerLogger.info('worker initialized');
keyProviderOptions = data.keyProviderOptions;
useSharedKey = !!data.keyProviderOptions.sharedKey;
// acknowledge init successful
const ackMsg: InitAck = {
kind: 'initAck',
data: { enabled: isEncryptionEnabled },
};
postMessage(ackMsg);
break;
case 'enable':
setEncryptionEnabled(data.enabled, data.participantIdentity);
workerLogger.info(
`set participant sender key ${data.participantIdentity} index ${data.keyIndex}`,
`updated e2ee enabled status for ${data.participantIdentity} to ${data.enabled}`,
);
getParticipantKeyHandler(data.participantIdentity).setKey(data.key, data.keyIndex);
} else {
workerLogger.error('no participant Id was provided and shared key usage is disabled');
}
break;
case 'removeTransform':
unsetCryptorParticipant(data.trackId, data.participantIdentity);
break;
case 'updateCodec':
getTrackCryptor(data.participantIdentity, data.trackId).setVideoCodec(data.codec);
break;
case 'setRTPMap':
// this is only used for the local participant
rtpMap = data.map;
participantCryptors.forEach((cr) => {
if (cr.getParticipantIdentity() === data.participantIdentity) {
cr.setRtpMap(data.map);
// acknowledge enable call successful
postMessage(ev.data);
break;
case 'decode':
let cryptor = getTrackCryptor(data.participantIdentity, data.trackId);
cryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'encode':
let pubCryptor = getTrackCryptor(data.participantIdentity, data.trackId);
pubCryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'setKey':
if (useSharedKey) {
await setSharedKey(data.key, data.keyIndex);
} else if (data.participantIdentity) {
workerLogger.info(
`set participant sender key ${data.participantIdentity} index ${data.keyIndex}`,
);
await getParticipantKeyHandler(data.participantIdentity).setKey(data.key, data.keyIndex);
} else {
workerLogger.error('no participant Id was provided and shared key usage is disabled');
}
});
break;
case 'ratchetRequest':
handleRatchetRequest(data);
break;
case 'setSifTrailer':
handleSifTrailer(data.trailer);
break;
default:
break;
}
break;
case 'removeTransform':
unsetCryptorParticipant(data.trackId, data.participantIdentity);
break;
case 'updateCodec':
getTrackCryptor(data.participantIdentity, data.trackId).setVideoCodec(data.codec);
break;
case 'setRTPMap':
// this is only used for the local participant
rtpMap = data.map;
participantCryptors.forEach((cr) => {
if (cr.getParticipantIdentity() === data.participantIdentity) {
cr.setRtpMap(data.map);
}
});
break;
case 'ratchetRequest':
handleRatchetRequest(data);
break;
case 'setSifTrailer':
handleSifTrailer(data.trailer);
break;
default:
break;
}
});
};

async function handleRatchetRequest(data: RatchetRequestMessage['data']) {
Expand Down Expand Up @@ -210,9 +214,9 @@ function setEncryptionEnabled(enable: boolean, participantIdentity: string) {
encryptionEnabledMap.set(participantIdentity, enable);
}

function setSharedKey(key: CryptoKey, index?: number) {
async function setSharedKey(key: CryptoKey, index?: number) {
workerLogger.info('set shared key', { index });
getSharedKeyHandler().setKey(key, index);
await getSharedKeyHandler().setKey(key, index);
}

function setupCryptorErrorEvents(cryptor: FrameCryptor) {
Expand Down
Loading