Skip to content

Commit

Permalink
Handle e2ee worker messages sequentially
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Sep 30, 2024
1 parent 1a838e8 commit 1350068
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 89 deletions.
16 changes: 8 additions & 8 deletions src/e2ee/E2eeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter<E2
break;

case 'enable':
if (data.enabled) {
this.keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
}
if (
this.encryptionEnabled !== data.enabled &&
data.participantIdentity === this.room?.localParticipant.identity
Expand All @@ -134,11 +139,6 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter<E2
}
this.emit(EncryptionEvent.ParticipantEncryptionStatusChanged, data.enabled, participant);
}
if (this.encryptionEnabled) {
this.keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
}
break;
case 'ratchetKey':
this.keyProvider.emit(KeyProviderEvent.KeyRatcheted, data.material, data.keyIndex);
Expand Down Expand Up @@ -196,13 +196,13 @@ export class E2EEManager extends (EventEmitter as new () => TypedEventEmitter<E2
if (!this.room) {
throw new TypeError(`expected room to be present on signal connect`);
}
keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
this.setParticipantCryptorEnabled(
this.room.localParticipant.isE2EEEnabled,
this.room.localParticipant.identity,
);
keyProvider.getKeys().forEach((keyInfo) => {
this.postKey(keyInfo);
});
});
room.localParticipant.on(ParticipantEvent.LocalTrackPublished, async (publication) => {
this.setupE2EESender(publication.track!, publication.track!.sender!);
Expand Down
14 changes: 10 additions & 4 deletions src/e2ee/worker/FrameCryptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,17 @@ export class FrameCryptor extends BaseFrameCryptor {
}
const keySet = this.keys.getKeySet();
if (!keySet) {
throw new TypeError(
`key set not found for ${
this.participantIdentity
} at index ${this.keys.getCurrentKeyIndex()}`,
this.emit(
CryptorEvent.Error,
new CryptorError(
`key set not found for ${
this.participantIdentity
} at index ${this.keys.getCurrentKeyIndex()}`,
CryptorErrorReason.MissingKey,
this.participantIdentity,
),
);
return controller.enqueue(encodedFrame);
}
const { encryptionKey } = keySet;
const keyIndex = this.keys.getCurrentKeyIndex();
Expand Down
158 changes: 81 additions & 77 deletions src/e2ee/worker/e2ee.worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { workerLogger } from '../../logger';
import { VideoCodec } from '../../room/track/options';
import { AsyncQueue } from '../../utils/AsyncQueue';
import { KEY_PROVIDER_DEFAULTS } from '../constants';
import { CryptorErrorReason } from '../errors';
import { CryptorEvent, KeyHandlerEvent } from '../events';
Expand All @@ -17,6 +18,7 @@ import { ParticipantKeyHandler } from './ParticipantKeyHandler';
const participantCryptors: FrameCryptor[] = [];
const participantKeys: Map<string, ParticipantKeyHandler> = new Map();
let sharedKeyHandler: ParticipantKeyHandler | undefined;
let messageQueue = new AsyncQueue();

let isEncryptionEnabled: boolean = false;

Expand All @@ -31,85 +33,87 @@ let rtpMap: Map<number, VideoCodec> = new Map();
workerLogger.setDefaultLevel('info');

onmessage = (ev) => {
const { kind, data }: E2EEWorkerMessage = ev.data;
messageQueue.run(async () => {
const { kind, data }: E2EEWorkerMessage = ev.data;

switch (kind) {
case 'init':
workerLogger.setLevel(data.loglevel);
workerLogger.info('worker initialized');
keyProviderOptions = data.keyProviderOptions;
useSharedKey = !!data.keyProviderOptions.sharedKey;
// acknowledge init successful
const ackMsg: InitAck = {
kind: 'initAck',
data: { enabled: isEncryptionEnabled },
};
postMessage(ackMsg);
break;
case 'enable':
setEncryptionEnabled(data.enabled, data.participantIdentity);
workerLogger.info(
`updated e2ee enabled status for ${data.participantIdentity} to ${data.enabled}`,
);
// acknowledge enable call successful
postMessage(ev.data);
break;
case 'decode':
let cryptor = getTrackCryptor(data.participantIdentity, data.trackId);
cryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'encode':
let pubCryptor = getTrackCryptor(data.participantIdentity, data.trackId);
pubCryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'setKey':
if (useSharedKey) {
setSharedKey(data.key, data.keyIndex);
} else if (data.participantIdentity) {
switch (kind) {
case 'init':
workerLogger.setLevel(data.loglevel);
workerLogger.info('worker initialized');
keyProviderOptions = data.keyProviderOptions;
useSharedKey = !!data.keyProviderOptions.sharedKey;
// acknowledge init successful
const ackMsg: InitAck = {
kind: 'initAck',
data: { enabled: isEncryptionEnabled },
};
postMessage(ackMsg);
break;
case 'enable':
setEncryptionEnabled(data.enabled, data.participantIdentity);
workerLogger.info(
`set participant sender key ${data.participantIdentity} index ${data.keyIndex}`,
`updated e2ee enabled status for ${data.participantIdentity} to ${data.enabled}`,
);
getParticipantKeyHandler(data.participantIdentity).setKey(data.key, data.keyIndex);
} else {
workerLogger.error('no participant Id was provided and shared key usage is disabled');
}
break;
case 'removeTransform':
unsetCryptorParticipant(data.trackId, data.participantIdentity);
break;
case 'updateCodec':
getTrackCryptor(data.participantIdentity, data.trackId).setVideoCodec(data.codec);
break;
case 'setRTPMap':
// this is only used for the local participant
rtpMap = data.map;
participantCryptors.forEach((cr) => {
if (cr.getParticipantIdentity() === data.participantIdentity) {
cr.setRtpMap(data.map);
// acknowledge enable call successful
postMessage(ev.data);
break;
case 'decode':
let cryptor = getTrackCryptor(data.participantIdentity, data.trackId);
cryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'encode':
let pubCryptor = getTrackCryptor(data.participantIdentity, data.trackId);
pubCryptor.setupTransform(
kind,
data.readableStream,
data.writableStream,
data.trackId,
data.codec,
);
break;
case 'setKey':
if (useSharedKey) {
await setSharedKey(data.key, data.keyIndex);
} else if (data.participantIdentity) {
workerLogger.info(
`set participant sender key ${data.participantIdentity} index ${data.keyIndex}`,
);
await getParticipantKeyHandler(data.participantIdentity).setKey(data.key, data.keyIndex);
} else {
workerLogger.error('no participant Id was provided and shared key usage is disabled');
}
});
break;
case 'ratchetRequest':
handleRatchetRequest(data);
break;
case 'setSifTrailer':
handleSifTrailer(data.trailer);
break;
default:
break;
}
break;
case 'removeTransform':
unsetCryptorParticipant(data.trackId, data.participantIdentity);
break;
case 'updateCodec':
getTrackCryptor(data.participantIdentity, data.trackId).setVideoCodec(data.codec);
break;
case 'setRTPMap':
// this is only used for the local participant
rtpMap = data.map;
participantCryptors.forEach((cr) => {
if (cr.getParticipantIdentity() === data.participantIdentity) {
cr.setRtpMap(data.map);
}
});
break;
case 'ratchetRequest':
handleRatchetRequest(data);
break;
case 'setSifTrailer':
handleSifTrailer(data.trailer);
break;
default:
break;
}
});
};

async function handleRatchetRequest(data: RatchetRequestMessage['data']) {
Expand Down Expand Up @@ -210,9 +214,9 @@ function setEncryptionEnabled(enable: boolean, participantIdentity: string) {
encryptionEnabledMap.set(participantIdentity, enable);
}

function setSharedKey(key: CryptoKey, index?: number) {
async function setSharedKey(key: CryptoKey, index?: number) {
workerLogger.info('set shared key', { index });
getSharedKeyHandler().setKey(key, index);
await getSharedKeyHandler().setKey(key, index);
}

function setupCryptorErrorEvents(cryptor: FrameCryptor) {
Expand Down

0 comments on commit 1350068

Please sign in to comment.