Skip to content

Commit

Permalink
feat: add heartbeat to WS for persistent connections (#203)
Browse files Browse the repository at this point in the history
* feat: add heartbeat to lobby ws

* feat: enable reconnect on frontend

* feat: add heartbeat to organizer and remove unused connections

* feat: better ws cleanup

* fix: check if connection is closed at ping, and allow multiple same user tabs for organizers

* fix: not start heartbeatInterval if testing
  • Loading branch information
edalholt authored Nov 12, 2023
1 parent 0948291 commit a9e698e
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 48 deletions.
4 changes: 4 additions & 0 deletions backend/controllers/assembly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { User } from "../models/user";
import { Votation, Option } from "../models/vote";
import { RequestWithNtnuiNo } from "../utils/request";
import { AssemblyResponseType } from "../types/assembly";
import { organizerConnections } from "../utils/socketNotifier";

export async function createAssembly(req: RequestWithNtnuiNo, res: Response) {
if (!req.ntnuiNo) {
Expand Down Expand Up @@ -109,7 +110,10 @@ export async function deleteAssembly(req: RequestWithNtnuiNo, res: Response) {
}
await Votation.findByIdAndDelete(vote);
});

await Assembly.deleteOne({ _id: assembly._id });
// Clean up websocket connections for this assembly
organizerConnections.delete(group);
return res.status(200).json({ message: "Assembly successfully deleted" });
}
}
Expand Down
7 changes: 7 additions & 0 deletions backend/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import votationRoutes from "./routes/votation";
import { parse } from "url";
import { lobbyWss } from "./wsServers/lobby";
import { organizerWss } from "./wsServers/organizer";
import { startHeartbeatInterval } from "./utils/socketNotifier";

dotenv.config();

Expand Down Expand Up @@ -63,6 +64,12 @@ server.on("upgrade", function upgrade(request, socket, head) {
}
});

// Start sending pings/Heartbeat to ws-connections to keep connections alive.
// Not started when testing, as Jest will not stop properly if the interval is running.
if (process.env.NODE_ENV !== "test") {
startHeartbeatInterval;
}

try {
// Jest will start app itself when testing, and not run on port 3000 to avoid collisions.
if (process.env.NODE_ENV !== "test") {
Expand Down
101 changes: 64 additions & 37 deletions backend/utils/socketNotifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,96 @@ import { NTNUINoFromRequest } from "./wsCookieRetriever";
/*
* This file contains functions for storing connections and notifying users connected to the WebSocket servers.
* Because the connections are stored in arrays/local data structures, the service has to run on a single instance, as the connections cannot be shared between instances.
* For making it possible to run multiple instances, this part can be rewritten using for example a redis or mongoDB database.
* For making it possible to run multiple instances, this part can be rewritten using for example a Redis or the MongoDB database.
* However, this is not necessary for the current use case, as the one instance should be able to handle 350 concurrent connections on Azure basic plan. And thousands on the standard plan.
* https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/azure-subscription-service-limits
*/

type SocketList = { [key: number]: WebSocket };
type OrganizersByGroupSlug = { [key: string]: SocketList };

// Store all active organizer connections, the connections are stored by their respective groupSlug.
// This makes it possible to send messages to all logged in organizers of a group.
export const organizerConnections: OrganizersByGroupSlug = {};
// Store all active participant connections, for access when sending messages about assembly.
export const lobbyConnections: SocketList = [];
// An organizer can be logged in on multiple devices, and multiple organizers can receive events from the same group.
export const organizerConnections = new Map<string, WebSocket[]>();
// Store all active lobby connections, for access when sending messages to assembly participants.
// Maximum one lobby connection per user.
export const lobbyConnections = new Map<number, WebSocket>();

export function storeLobbyConnectionByCookie(
const sendPing = (ws: WebSocket) => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
}
};

// Send ping to all participants to check if they are still connected and prevent the connection from closing.
export const startHeartbeatInterval = setInterval(() => {
lobbyConnections.forEach((ws: WebSocket, userID: number) => {
// Remove connection if it is closed by the client.
if (ws.readyState === WebSocket.CLOSED) {
lobbyConnections.delete(userID);
return;
}
sendPing(ws);
});

organizerConnections.forEach((socketList) => {
socketList.forEach((ws: WebSocket) => {
// Remove connection if it is closed by the client.
if (ws.readyState === WebSocket.CLOSED) {
socketList.splice(socketList.indexOf(ws), 1);
return;
}
sendPing(ws);
});
});
}, 30000); // 30 seconds

export const storeLobbyConnectionByCookie = (
ws: WebSocket,
req: IncomingMessage
) {
) => {
const ntnuiNo = NTNUINoFromRequest(req);
if (ntnuiNo !== null) {
// Notify about kicking out old connection if user already is connected.
if (typeof lobbyConnections[ntnuiNo] !== null) {
if (lobbyConnections.has(ntnuiNo)) {
notifyOneParticipant(ntnuiNo, JSON.stringify({ status: "removed" }));
lobbyConnections.get(ntnuiNo)?.close();
}
// Store socket connection on NTNUI ID.
lobbyConnections[ntnuiNo] = ws;
lobbyConnections.set(ntnuiNo, ws);
console.log("User " + ntnuiNo + " connected to lobby");
}
};

export const removeLobbyConnectionByCookie = (req: IncomingMessage) => {
const ntnuiNo = NTNUINoFromRequest(req);
if (ntnuiNo !== null) {
lobbyConnections.delete(ntnuiNo);
}
}
};

export function storeOrganizerConnectionByNTNUINo(
ntnui_no: number,
groupSlug: string,
ws: WebSocket
) {
if (!organizerConnections[groupSlug]) organizerConnections[groupSlug] = [];
organizerConnections[groupSlug][ntnui_no] = ws;
}
export const storeOrganizerConnection = (groupSlug: string, ws: WebSocket) => {
if (!organizerConnections.get(groupSlug)) {
organizerConnections.set(groupSlug, [ws]);
}
organizerConnections.get(groupSlug)?.push(ws);
};

export const notifyOneParticipant = (ntnui_no: number, message: string) => {
try {
lobbyConnections[ntnui_no].send(message);
} catch (error) {
const connection = lobbyConnections.get(ntnui_no);
if (connection) connection.send(message);
else {
console.log(
"Could not notify user " +
ntnui_no +
". Is there a problem with the socket URL? (Ignore if testing / dev has restarted)"
" (disconnected). Is there a problem with the socket URL? (Ignore if headless testing / dev has restarted and wiped the connections)"
);
}
};

export const notifyOrganizers = (groupSlug: string, message: string) => {
if (organizerConnections[groupSlug]) {
for (const ntnui_no in organizerConnections[groupSlug]) {
console.log("Sending message to organizer " + ntnui_no);
try {
organizerConnections[groupSlug][ntnui_no].send(message);
} catch (error) {
console.log(
"Could not notify organizer " +
ntnui_no +
". Is there a problem with the socket URL? (Ignore if testing / dev has restarted)"
);
}
}
const connections = organizerConnections.get(groupSlug);
if (connections) {
connections.forEach((connection) => {
connection.send(message);
});
}
};
5 changes: 5 additions & 0 deletions backend/wsServers/lobby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ export const lobbyWss = new WebSocketServer({ noServer: true });
lobbyWss.on("connection", function connection(ws, req) {
// Store connections to be able to send messages to specific users when needed.
storeLobbyConnectionByCookie(ws, req);

ws.on("pong", () => {
// The client responded to the ping, so the connection is still active.
// Connections are deleted when the user logs out or closes the connection/tab.
});
});
9 changes: 7 additions & 2 deletions backend/wsServers/organizer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { WebSocketServer } from "ws";
import { NTNUINoFromRequest } from "../utils/wsCookieRetriever";
import { User } from "../models/user";
import { storeOrganizerConnectionByNTNUINo } from "../utils/socketNotifier";
import { storeOrganizerConnection } from "../utils/socketNotifier";

export const organizerWss = new WebSocketServer({ noServer: true });

Expand All @@ -28,7 +28,7 @@ organizerWss.on("connection", function connection(ws, req) {
membership.organizer && membership.groupSlug == groupSlug
)
) {
storeOrganizerConnectionByNTNUINo(ntnuiNo, groupSlug, ws);
storeOrganizerConnection(groupSlug, ws);
console.log(
"Organizer " + ntnuiNo + " are subscribed to group " + groupSlug
);
Expand All @@ -45,4 +45,9 @@ organizerWss.on("connection", function connection(ws, req) {
ws.close();
}
});

ws.on("pong", () => {
// The client responded to the ping, so the connection is still active.
// Connections are deleted when the assembly is deleted or the organizer closes the connection/tab.
});
});
16 changes: 10 additions & 6 deletions frontend/src/components/EditAssembly.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,18 @@ export function EditAssembly(state: { group: UserDataGroupType }) {
const [accordionActiveTabs, setAccordionActiveTabs] = useState<string[]>([]);

const { lastMessage, sendJsonMessage } = useWebSocket(
import.meta.env.VITE_SOCKET_URL + "/organizer"
import.meta.env.VITE_SOCKET_URL + "/organizer",
{
// Request access to live assembly data for the given group when the websocket is opened.
onOpen: () => sendJsonMessage({ groupSlug: group.groupSlug }),
//Will attempt to reconnect on all close events, such as server shutting down
shouldReconnect: () => true,
// Try to reconnect 300 times before giving up.
// Also possible to change interval (default is 5000ms)
reconnectAttempts: 300,
}
);

// Request access to live assembly data for the given group when component is mounted.
useEffect(() => {
sendJsonMessage({ groupSlug: group.groupSlug });
}, []);

useEffect(() => {
// Update state every time the websocket receive a message.
if (lastMessage) {
Expand Down
14 changes: 11 additions & 3 deletions frontend/src/pages/AssemblyPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ export function AssemblyLobby() {
LimitedVoteType | undefined
>(undefined);
const [voted, setVoted] = useState<boolean>(false);
const { lastMessage } = useWebSocket(
import.meta.env.VITE_SOCKET_URL + "/lobby"
const { lastMessage, getWebSocket } = useWebSocket(
import.meta.env.VITE_SOCKET_URL + "/lobby",
{
//Will attempt to reconnect on all close events, such as server shutting down
shouldReconnect: () => !kickedOut,
// Try to reconnect 300 times before giving up.
// Also possible to change interval (default is 5000ms)
reconnectAttempts: 300,
}
);
const { checkedIn, setCheckedIn } = useContext(
checkedInState
Expand Down Expand Up @@ -72,6 +79,7 @@ export function AssemblyLobby() {
// User is is removed from current lobby if logged in on another device.
if (decodedMessage.status == "removed") {
setKickedOut(true);
getWebSocket()?.close();
}
// Redirect only if user is checked in on the right group.
if (decodedMessage.group == groupSlug) {
Expand Down Expand Up @@ -133,7 +141,7 @@ export function AssemblyLobby() {
{kickedOut ? (
<WaitingRoom
message={
"You have logged in on another device, or you are kicked from this assembly."
"You have logged in on another device/tab, this tab is therefore disconnected."
}
/>
) : checkedIn && groupSlug && voted ? (
Expand Down

0 comments on commit a9e698e

Please sign in to comment.