Skip to content

Commit

Permalink
Enable batching multiple events into a single POST request over buffe…
Browse files Browse the repository at this point in the history
…r size if maxPostBytes setting is followed (#1356)
  • Loading branch information
matus-tomlein committed Oct 18, 2024
1 parent ff70283 commit 28b997e
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 35 deletions.
46 changes: 30 additions & 16 deletions libraries/tracker-core/src/emitter/emitter_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ export interface EmitterRequest {
*/
countEvents: () => number;
/**
* Cancel the timeout timer, should be called when the request is sent
* Cancel timeout timer if it is still pending.
* If not successful, the request will be aborted.
* @param successful - Whether the request was successful
* @param reason - Reason for aborting the request
*/
cancelTimeoutTimer: () => void;
closeRequest: (successful: boolean, reason?: string) => void;
}

export interface EmitterRequestConfiguration {
Expand All @@ -48,7 +51,6 @@ export interface EmitterRequestConfiguration {
keepalive?: boolean;
postPath?: string;
useStm?: boolean;
bufferSize?: number;
maxPostBytes?: number,
credentials?: 'omit' | 'same-origin' | 'include';
}
Expand Down Expand Up @@ -89,19 +91,23 @@ export function newEmitterRequest({
keepalive = true,
postPath = '/com.snowplowanalytics.snowplow/tp2',
useStm = true,
bufferSize,
maxPostBytes = 40000,
credentials = 'include',
}: EmitterRequestConfiguration): EmitterRequest {
let events: EmitterEvent[] = [];
let usePost = eventMethod.toLowerCase() === 'post';
let timer: ReturnType<typeof setTimeout> | undefined;
let abortController: AbortController | undefined;

function countBytes(): number {
return events.reduce(
let count = events.reduce(
(acc, event) => acc + (usePost ? event.getPOSTRequestBytesCount() : event.getGETRequestBytesCount()),
0
);
if (usePost) {
count += 88; // 88 bytes for the payload_data envelope
}
return count;
}

function countEvents(): number {
Expand All @@ -112,7 +118,6 @@ export function newEmitterRequest({
return events.length > 0 ? events[0].getServerAnonymization() : undefined;
}


function addEvent(event: EmitterEvent) {
if (events.length > 0 && getServerAnonymizationOfExistingEvents() !== event.getServerAnonymization()) {
return false;
Expand All @@ -128,9 +133,6 @@ export function newEmitterRequest({

function isFull(): boolean {
if (usePost) {
if (bufferSize !== undefined && countEvents() >= Math.max(1, bufferSize)) {
return true;
}
return countBytes() >= maxPostBytes;
} else {
return events.length >= 1;
Expand Down Expand Up @@ -167,15 +169,19 @@ export function newEmitterRequest({
}

function makeRequest(url: string, options: RequestInit): Request {
const controller = new AbortController();
closeRequest(false);

abortController = new AbortController();
timer = setTimeout(() => {
console.error('Request timed out');
controller.abort()
const reason = 'Request timed out';
console.error(reason);
timer = undefined;
closeRequest(false, reason);
}, connectionTimeout ?? 5000);

const requestOptions: RequestInit = {
headers: createHeaders(),
signal: controller.signal,
signal: abortController.signal,
keepalive,
credentials,
...options,
Expand Down Expand Up @@ -218,11 +224,19 @@ export function newEmitterRequest({
}
}

function cancelTimeoutTimer() {
if (timer) {
function closeRequest(successful: boolean, reason?: string) {
if (timer !== undefined) {
clearTimeout(timer);
timer = undefined;
}

if (abortController !== undefined) {
const controller = abortController;
abortController = undefined;
if (!successful) {
controller.abort(reason);
}
}
}

return {
Expand All @@ -232,6 +246,6 @@ export function newEmitterRequest({
countBytes,
countEvents,
isFull,
cancelTimeoutTimer,
closeRequest,
};
}
8 changes: 5 additions & 3 deletions libraries/tracker-core/src/emitter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export interface EmitterConfigurationBase {
bufferSize?: number;
/**
* The max size a POST request can be before the tracker will force send it
* Also dictates the max size of a POST request before a batch of events is split into multiple requests
* @defaultValue 40000
*/
maxPostBytes?: number;
Expand Down Expand Up @@ -274,7 +275,7 @@ export function newEmitter({
try {
const response = await customFetch(fetchRequest);

request.cancelTimeoutTimer();
request.closeRequest(true);

if (response.ok) {
callOnRequestSuccess(payloads, response);
Expand All @@ -293,6 +294,8 @@ export function newEmitter({
return { success: false, retry: willRetry, status: response.status };
}
} catch (e) {
request.closeRequest(false);

const message = typeof e === 'string' ? e : e ? (e as Error).message : 'Unknown error';
callOnRequestFailure({
events: payloads,
Expand All @@ -312,7 +315,6 @@ export function newEmitter({
customHeaders,
connectionTimeout,
keepalive,
bufferSize,
maxPostBytes,
useStm,
credentials,
Expand All @@ -324,7 +326,7 @@ export function newEmitter({
LOG.warn('Event (' + bytes + 'B) too big, max is ' + maxBytes);

if (usePost) {
const bytes = emitterEvent.getPOSTRequestBytesCount();
const bytes = emitterEvent.getPOSTRequestBytesCount() + 88; // 88 bytes for the payload_data envelope
const tooBig = bytes > maxPostBytes;
if (tooBig) {
eventTooBigWarning(bytes, maxPostBytes);
Expand Down
13 changes: 5 additions & 8 deletions libraries/tracker-core/test/emitter/emitter_request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { newEventStorePayload } from '../../src/event_store_payload';

const newEmitterEventFromPayload = (payload: Record<string, unknown>) => {
return newEmitterEvent(newEventStorePayload({ payload }));
}
};

// MARK: - addEvent

Expand Down Expand Up @@ -71,7 +71,7 @@ test('countBytes returns the correct byte count', (t) => {

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'mob' })));
t.is(request.countBytes(), 40);
t.is(request.countBytes(), 40 + 88); // 40 bytes for each event, 88 bytes for the payload_data envelope
});

// MARK: - countEvents
Expand All @@ -89,34 +89,31 @@ test('countEvents returns the correct event count', (t) => {

// MARK: - isFull

test('isFull returns false when not reached buffer size', (t) => {
test('isFull returns false when not reached max post bytes', (t) => {
const request = newEmitterRequest({
endpoint: 'https://example.com',
maxPostBytes: 1000,
bufferSize: 2,
});

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
t.false(request.isFull());
});

test('isFull returns true when reached buffer size', (t) => {
test('isFull returns false when reached buffer size and not max post bytes', (t) => {
const request = newEmitterRequest({
endpoint: 'https://example.com',
maxPostBytes: 1000,
bufferSize: 2,
});

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'mob' })));
t.true(request.isFull());
t.false(request.isFull());
});

test('isFull returns true when reached max post bytes', (t) => {
const request = newEmitterRequest({
endpoint: 'https://example.com',
maxPostBytes: 10,
bufferSize: 2,
});

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
Expand Down
3 changes: 2 additions & 1 deletion libraries/tracker-core/test/emitter/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ test('adds a timeout to the request', async (t) => {

return new Promise((resolve, reject) => {
let timer = setTimeout(() => {
t.fail('Request should have timed out');
resolve(new Response(null, { status: 200 }));
}, 1000);
}, 500);

input.signal?.addEventListener('abort', () => {
clearTimeout(timer);
Expand Down
1 change: 1 addition & 0 deletions plugins/browser-plugin-media-tracking/tests/test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ describe('MediaTrackingPlugin', () => {
},
],
contexts: { webPage: false },
customFetch: async () => new Response(null, { status: 200 }),
});
id = `media-${idx}`;
});
Expand Down
1 change: 1 addition & 0 deletions plugins/browser-plugin-media/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ describe('Media Tracking API', () => {
},
],
contexts: { webPage: false },
customFetch: async () => new Response(null, { status: 200 }),
});
id = `media-${idx}`;
});
Expand Down
18 changes: 11 additions & 7 deletions trackers/node-tracker/test/tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,13 @@ for (const eventMethod of testMethods) {
eventMethod,
bufferSize: 0,
onRequestSuccess: (batch) => {
const expected = batch[0]['e'] === 'tr' ? expectedTransaction : expectedItem;
batch.forEach((payload) => {
const expected = payload['e'] === 'tr' ? expectedTransaction : expectedItem;

checkPayload(batch[0], expected, t);
checkPayload(payload, expected, t);

requestCount--;
requestCount--;
});
if (requestCount === 0) {
resolve(batch);
}
Expand Down Expand Up @@ -478,11 +480,13 @@ for (const eventMethod of testMethods) {
endpoint,
eventMethod,
bufferSize: 0,
onRequestSuccess: ([pl]) => {
checkPayload(pl, expected, t);
count--;
onRequestSuccess: (batch) => {
batch.forEach((pl) => {
checkPayload(pl, expected, t);
count--;
});
if (count === 0) {
resolve(pl);
resolve(batch);
}
},
onRequestFailure: reject,
Expand Down

0 comments on commit 28b997e

Please sign in to comment.