Skip to content

Commit

Permalink
fix(datachannel): sending order is now preserved correctly (#1038)
Browse files Browse the repository at this point in the history
This upgrades `binarypack` to version 2.

`pack` now returns `ArrayBuffer`s which can be sent over all data
channel types without asynchronous conversion.
This implementation is also much faster than version 1.

Closes #746
  • Loading branch information
jonasgloning authored Jun 22, 2023
1 parent 0836356 commit 0fb6179
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 33 deletions.
9 changes: 5 additions & 4 deletions e2e/datachannel/serialization.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ describe("DataChannel:Default", () => {
await P.init();
});
it("should transfer numbers", serializationTest("./numbers.js"));
/** ordering bug: chunked string not in order */
// it('should transfer strings', serializationTest("./strings.js"))
it("should transfer strings", serializationTest("./strings.js"));
it("should transfer objects", serializationTest("./objects.js"));
it("should transfer arrays", serializationTest("./arrays.js"));
/** can't send bug */
// it('should transfer typed arrays / array buffers', serializationTest("./arraybuffers.js"))
it(
"should transfer typed arrays / array buffers",
serializationTest("./arraybuffers.js"),
);
});
33 changes: 15 additions & 18 deletions lib/dataconnection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { util } from "./util";
import { concatArrayBuffers, util } from "./util";
import logger from "./logger";
import { Negotiator } from "./negotiator";
import { ConnectionType, SerializationType, ServerMessageType } from "./enums";
Expand Down Expand Up @@ -65,7 +65,7 @@ export class DataConnection
private _buffering = false;
private _chunkedData: {
[id: number]: {
data: Blob[];
data: Uint8Array[];
count: number;
total: number;
};
Expand Down Expand Up @@ -119,9 +119,7 @@ export class DataConnection
/** Called by the Negotiator when the DataChannel is ready. */
override _initializeDataChannel(dc: RTCDataChannel): void {
this._dc = dc;
if (!util.supports.binaryBlob || util.supports.reliable) {
this.dataChannel.binaryType = "arraybuffer";
}
this.dataChannel.binaryType = "arraybuffer";

this.dataChannel.onopen = () => {
logger.log(`DC#${this.connectionId} dc connection success`);
Expand Down Expand Up @@ -193,7 +191,7 @@ export class DataConnection
__peerData: number;
n: number;
total: number;
data: Blob;
data: ArrayBuffer;
}): void {
const id = data.__peerData;
const chunkInfo = this._chunkedData[id] || {
Expand All @@ -202,7 +200,7 @@ export class DataConnection
total: data.total,
};

chunkInfo.data[data.n] = data.data;
chunkInfo.data[data.n] = new Uint8Array(data.data);
chunkInfo.count++;
this._chunkedData[id] = chunkInfo;

Expand All @@ -211,8 +209,8 @@ export class DataConnection
delete this._chunkedData[id];

// We've received all the chunks--time to construct the complete data.
const data = new Blob(chunkInfo.data);
this._handleDataMessage({ data });
const data = concatArrayBuffers(chunkInfo.data);
this.emit("data", util.unpack(data));
}
}

Expand Down Expand Up @@ -283,6 +281,11 @@ export class DataConnection
return;
}

if (data instanceof Blob) {
data.arrayBuffer().then((ab) => this.send(ab));
return;
}

if (this.serialization === SerializationType.JSON) {
this._bufferedSend(this.stringify(data));
} else if (
Expand All @@ -291,18 +294,12 @@ export class DataConnection
) {
const blob = util.pack(data);

if (!chunked && blob.size > util.chunkedMTU) {
if (!chunked && blob.byteLength > util.chunkedMTU) {
this._sendChunks(blob);
return;
}

if (!util.supports.binaryBlob) {
// We only do this if we really need to (e.g. blobs are not supported),
// because this conversion is costly.
this._encodingQueue.enque(blob);
} else {
this._bufferedSend(blob);
}
this._bufferedSend(blob);
} else {
this._bufferedSend(data);
}
Expand Down Expand Up @@ -364,7 +361,7 @@ export class DataConnection
}
}

private _sendChunks(blob: Blob): void {
private _sendChunks(blob: ArrayBuffer): void {
const blobs = util.chunk(blob);
logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);

Expand Down
19 changes: 16 additions & 3 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ export class Util {
private _dataCount: number = 1;

chunk(
blob: Blob,
): { __peerData: number; n: number; total: number; data: Blob }[] {
blob: ArrayBuffer,
): { __peerData: number; n: number; total: number; data: ArrayBuffer }[] {
const chunks = [];
const size = blob.size;
const size = blob.byteLength;
const total = Math.ceil(size / util.chunkedMTU);

let index = 0;
Expand Down Expand Up @@ -208,3 +208,16 @@ export class Util {
* :::
*/
export const util = new Util();
export function concatArrayBuffers(bufs: Uint8Array[]) {
let size = 0;
for (const buf of bufs) {
size += buf.byteLength;
}
const result = new Uint8Array(size);
let offset = 0;
for (const buf of bufs) {
result.set(buf, offset);
offset += buf.byteLength;
}
return result;
}
21 changes: 14 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
"dependencies": {
"@swc/helpers": "^0.5.0",
"eventemitter3": "^4.0.7",
"peerjs-js-binarypack": "1.0.2",
"peerjs-js-binarypack": "^2.0.0",
"webrtc-adapter": "^8.0.0"
}
}

0 comments on commit 0fb6179

Please sign in to comment.