Skip to content

Commit

Permalink
Update Tracker Architecture for Multi-Peer Connections (#77)
Browse files Browse the repository at this point in the history
* chore: Update npm dependencies to latest versions

* refactor: update tracker to support multiple peers on one connection

* refactor: Remove unused code for peers in FastTracker

* refactor: processStop

* refactor: disconnect peer

* feat: Add clearPeersInterval to automatically disconnect inactive peers

* refactor: tracker announce & disconnect logic

* refactor: small improvements

* refactor: Update SocketContext references in heap-usage and simulation.test

* refactor: Update tests

* refactor: Improvements

* refactor: Improve FastTracker by optimizing peer management

* Remove completedCount

* Refactor remove peer messages

* Fix peers statistics

* Fix WebSockets counter

* Revert socket counter

* Enable messages debugging

---------

Co-authored-by: Andriy Lysnevych <[email protected]>
  • Loading branch information
DimaDemchenko and mrlika authored Jun 23, 2024
1 parent 367c8d8 commit bc9f999
Show file tree
Hide file tree
Showing 10 changed files with 754 additions and 1,067 deletions.
435 changes: 219 additions & 216 deletions lib/fast-tracker.ts

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@

export type { UWebSocketsTracker } from "./uws-tracker.js";
export type { FastTracker } from "./fast-tracker.js";
export type { Tracker, PeerContext, TrackerError } from "./tracker.js";
export type {
Tracker,
SocketContext as PeerContext,
TrackerError,
} from "./tracker.js";
2 changes: 1 addition & 1 deletion lib/run-uws-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ function buildServer({
peersCount += swarm.peers.length;

const infoHashHex = Buffer.from(infoHash, "binary").toString("hex");
peersCountPerInfoHash[infoHashHex] = peersCount;
peersCountPerInfoHash[infoHashHex] = swarm.peers.length;
}

const serversStats = new Array<{
Expand Down
21 changes: 17 additions & 4 deletions lib/tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,29 @@
* limitations under the License.
*/

export interface SocketContext {
sendMessage: (json: object, peer: SocketContext) => void;
}

export type Swarm = {
infoHash: string;
completedPeers?: Set<string>;
peers: PeerContext[];
};

export interface PeerContext {
id?: string;
sendMessage: (json: object, peer: PeerContext) => void;
peerId: string;
sendMessage: (json: object, peer: SocketContext) => void;
socket: SocketContext;
lastAccessed: number;
swarm: Swarm;
}

export interface Tracker {
readonly swarms: ReadonlyMap<string, { peers: readonly PeerContext[] }>;
readonly settings: object;
processMessage: (json: object, peer: PeerContext) => void;
disconnectPeer: (peer: PeerContext) => void;
processMessage: (json: object, peer: SocketContext) => void;
disconnectPeersFromSocket: (peer: SocketContext) => void;
}

export class TrackerError extends Error {}
34 changes: 11 additions & 23 deletions lib/uws-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import {
HttpResponse,
} from "uWebSockets.js";
import Debug from "debug";
import { Tracker, TrackerError, PeerContext } from "./tracker.js";
import { Tracker, TrackerError, SocketContext } from "./tracker.js";
import {
ServerSettings,
WebSocketsSettings,
WebSocketsAccessSettings,
} from "./run-uws-tracker.js";

declare module "./tracker.js" {
interface PeerContext {
ws: WebSocket<PeerContext>;
interface SocketContext {
ws: WebSocket<SocketContext>;
}
}

Expand Down Expand Up @@ -186,7 +186,7 @@ export class UWebSocketsTracker {
idleTimeout: this.settings.websockets.idleTimeout,
open: this.onOpen,
upgrade: this.onUpgrade,
drain: (ws: WebSocket<PeerContext>) => {
drain: (ws: WebSocket<SocketContext>) => {
if (debugWebSocketsEnabled) {
debugWebSockets("drain", ws.getBufferedAmount());
}
Expand Down Expand Up @@ -276,7 +276,7 @@ export class UWebSocketsTracker {
);
}

response.upgrade<Pick<PeerContext, "sendMessage">>(
response.upgrade<Omit<SocketContext, "ws">>(
{
sendMessage,
},
Expand All @@ -288,7 +288,7 @@ export class UWebSocketsTracker {
};

private readonly onMessage = (
ws: WebSocket<PeerContext>,
ws: WebSocket<SocketContext>,
message: ArrayBuffer,
): void => {
debugWebSockets("message of size", message.byteLength);
Expand All @@ -308,13 +308,7 @@ export class UWebSocketsTracker {
}

if (debugMessagesEnabled) {
debugMessages(
"in",
userData.id === undefined
? "unknown peer"
: Buffer.from(userData.id).toString("hex"),
json,
);
debugMessages("in", json);
}

try {
Expand All @@ -330,28 +324,22 @@ export class UWebSocketsTracker {
};

private readonly onClose = (
ws: WebSocket<PeerContext>,
ws: WebSocket<SocketContext>,
code: number,
): void => {
this.webSocketsCount--;

if (ws.getUserData().sendMessage !== undefined) {
this.tracker.disconnectPeer(ws as unknown as PeerContext);
this.tracker.disconnectPeersFromSocket(ws as unknown as SocketContext);
}

debugWebSockets("closed with code", code);
};
}

function sendMessage(json: object, peerContext: PeerContext): void {
function sendMessage(json: object, peerContext: SocketContext): void {
peerContext.ws.send(JSON.stringify(json), false, false);
if (debugMessagesEnabled) {
debugMessages(
"out",
peerContext.id === undefined
? "unknown peer"
: Buffer.from(peerContext.id).toString("hex"),
json,
);
debugMessages("out", json);
}
}
Loading

0 comments on commit bc9f999

Please sign in to comment.