Skip to content

Commit

Permalink
Enable batching multiple events into a single POST request over buffe…
Browse files Browse the repository at this point in the history
…r size if maxPostBytes setting is followed (#1356)
  • Loading branch information
matus-tomlein committed Oct 25, 2024
1 parent 6aeb241 commit 88213c9
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## EmitterConfigurationBase.maxPostBytes property

The max size a POST request can be before the tracker will force send it
The max size a POST request can be before the tracker will force send it Also dictates the max size of a POST request before a batch of events is split into multiple requests

<b>Signature:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ interface EmitterConfigurationBase
| [idService?](./browser-tracker.emitterconfigurationbase.idservice.md) | string | <i>(Optional)</i> Id service full URL. This URL will be added to the queue and will be called using a GET method. This option is there to allow the service URL to be called in order to set any required identifiers e.g. extra cookies.<!-- -->The request respects the <code>anonymousTracking</code> option, including the SP-Anonymous header if needed, and any additional custom headers from the customHeaders option. |
| [keepalive?](./browser-tracker.emitterconfigurationbase.keepalive.md) | boolean | <i>(Optional)</i> Indicates that the request should be allowed to outlive the webpage that initiated it. Enables collector requests to complete even if the page is closed or navigated away from. |
| [maxGetBytes?](./browser-tracker.emitterconfigurationbase.maxgetbytes.md) | number | <i>(Optional)</i> The max size a GET request (its complete URL) can be. Requests over this size will be tried as a POST request. |
| [maxPostBytes?](./browser-tracker.emitterconfigurationbase.maxpostbytes.md) | number | <i>(Optional)</i> The max size a POST request can be before the tracker will force send it |
| [maxPostBytes?](./browser-tracker.emitterconfigurationbase.maxpostbytes.md) | number | <i>(Optional)</i> The max size a POST request can be before the tracker will force send it Also dictates the max size of a POST request before a batch of events is split into multiple requests |
| [onRequestFailure?](./browser-tracker.emitterconfigurationbase.onrequestfailure.md) | (data: RequestFailure, response?: Response) =&gt; void | <i>(Optional)</i> A callback function to be executed whenever a request fails to be sent to the collector. This is the inverse of the onRequestSuccess callback, so any non 2xx status code will trigger this callback. |
| [onRequestSuccess?](./browser-tracker.emitterconfigurationbase.onrequestsuccess.md) | (data: EventBatch, response: Response) =&gt; void | <i>(Optional)</i> A callback function to be executed whenever a request is successfully sent to the collector. In practice this means any request which returns a 2xx status code will trigger this callback. |
| [postPath?](./browser-tracker.emitterconfigurationbase.postpath.md) | string | <i>(Optional)</i> The post path which events will be sent to. Ensure your collector is configured to accept events on this post path |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## EmitterConfigurationBase.maxPostBytes property

The max size a POST request can be before the tracker will force send it
The max size a POST request can be before the tracker will force send it Also dictates the max size of a POST request before a batch of events is split into multiple requests

<b>Signature:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ interface EmitterConfigurationBase
| [idService?](./node-tracker.emitterconfigurationbase.idservice.md) | string | <i>(Optional)</i> Id service full URL. This URL will be added to the queue and will be called using a GET method. This option is there to allow the service URL to be called in order to set any required identifiers e.g. extra cookies.<!-- -->The request respects the <code>anonymousTracking</code> option, including the SP-Anonymous header if needed, and any additional custom headers from the customHeaders option. |
| [keepalive?](./node-tracker.emitterconfigurationbase.keepalive.md) | boolean | <i>(Optional)</i> Indicates that the request should be allowed to outlive the webpage that initiated it. Enables collector requests to complete even if the page is closed or navigated away from. |
| [maxGetBytes?](./node-tracker.emitterconfigurationbase.maxgetbytes.md) | number | <i>(Optional)</i> The max size a GET request (its complete URL) can be. Requests over this size will be tried as a POST request. |
| [maxPostBytes?](./node-tracker.emitterconfigurationbase.maxpostbytes.md) | number | <i>(Optional)</i> The max size a POST request can be before the tracker will force send it |
| [maxPostBytes?](./node-tracker.emitterconfigurationbase.maxpostbytes.md) | number | <i>(Optional)</i> The max size a POST request can be before the tracker will force send it Also dictates the max size of a POST request before a batch of events is split into multiple requests |
| [onRequestFailure?](./node-tracker.emitterconfigurationbase.onrequestfailure.md) | (data: RequestFailure, response?: Response) =&gt; void | <i>(Optional)</i> A callback function to be executed whenever a request fails to be sent to the collector. This is the inverse of the onRequestSuccess callback, so any non 2xx status code will trigger this callback. |
| [onRequestSuccess?](./node-tracker.emitterconfigurationbase.onrequestsuccess.md) | (data: EventBatch, response: Response) =&gt; void | <i>(Optional)</i> A callback function to be executed whenever a request is successfully sent to the collector. In practice this means any request which returns a 2xx status code will trigger this callback. |
| [postPath?](./node-tracker.emitterconfigurationbase.postpath.md) | string | <i>(Optional)</i> The post path which events will be sent to. Ensure your collector is configured to accept events on this post path |
Expand Down
46 changes: 30 additions & 16 deletions libraries/tracker-core/src/emitter/emitter_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ export interface EmitterRequest {
*/
countEvents: () => number;
/**
* Cancel the timeout timer, should be called when the request is sent
* Cancel timeout timer if it is still pending.
* If not successful, the request will be aborted.
* @param successful - Whether the request was successful
* @param reason - Reason for aborting the request
*/
cancelTimeoutTimer: () => void;
closeRequest: (successful: boolean, reason?: string) => void;
}

export interface EmitterRequestConfiguration {
Expand All @@ -48,7 +51,6 @@ export interface EmitterRequestConfiguration {
keepalive?: boolean;
postPath?: string;
useStm?: boolean;
bufferSize?: number;
maxPostBytes?: number,
credentials?: 'omit' | 'same-origin' | 'include';
}
Expand Down Expand Up @@ -89,19 +91,23 @@ export function newEmitterRequest({
keepalive = true,
postPath = '/com.snowplowanalytics.snowplow/tp2',
useStm = true,
bufferSize,
maxPostBytes = 40000,
credentials = 'include',
}: EmitterRequestConfiguration): EmitterRequest {
let events: EmitterEvent[] = [];
let usePost = eventMethod.toLowerCase() === 'post';
let timer: ReturnType<typeof setTimeout> | undefined;
let abortController: AbortController | undefined;

function countBytes(): number {
return events.reduce(
let count = events.reduce(
(acc, event) => acc + (usePost ? event.getPOSTRequestBytesCount() : event.getGETRequestBytesCount()),
0
);
if (usePost) {
count += 88; // 88 bytes for the payload_data envelope
}
return count;
}

function countEvents(): number {
Expand All @@ -112,7 +118,6 @@ export function newEmitterRequest({
return events.length > 0 ? events[0].getServerAnonymization() : undefined;
}


function addEvent(event: EmitterEvent) {
if (events.length > 0 && getServerAnonymizationOfExistingEvents() !== event.getServerAnonymization()) {
return false;
Expand All @@ -128,9 +133,6 @@ export function newEmitterRequest({

function isFull(): boolean {
if (usePost) {
if (bufferSize !== undefined && countEvents() >= Math.max(1, bufferSize)) {
return true;
}
return countBytes() >= maxPostBytes;
} else {
return events.length >= 1;
Expand Down Expand Up @@ -167,15 +169,19 @@ export function newEmitterRequest({
}

function makeRequest(url: string, options: RequestInit): Request {
const controller = new AbortController();
closeRequest(false);

abortController = new AbortController();
timer = setTimeout(() => {
console.error('Request timed out');
controller.abort()
const reason = 'Request timed out';
console.error(reason);
timer = undefined;
closeRequest(false, reason);
}, connectionTimeout ?? 5000);

const requestOptions: RequestInit = {
headers: createHeaders(),
signal: controller.signal,
signal: abortController.signal,
keepalive,
credentials,
...options,
Expand Down Expand Up @@ -218,11 +224,19 @@ export function newEmitterRequest({
}
}

function cancelTimeoutTimer() {
if (timer) {
function closeRequest(successful: boolean, reason?: string) {
if (timer !== undefined) {
clearTimeout(timer);
timer = undefined;
}

if (abortController !== undefined) {
const controller = abortController;
abortController = undefined;
if (!successful) {
controller.abort(reason);
}
}
}

return {
Expand All @@ -232,6 +246,6 @@ export function newEmitterRequest({
countBytes,
countEvents,
isFull,
cancelTimeoutTimer,
closeRequest,
};
}
8 changes: 5 additions & 3 deletions libraries/tracker-core/src/emitter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export interface EmitterConfigurationBase {
bufferSize?: number;
/**
* The max size a POST request can be before the tracker will force send it
* Also dictates the max size of a POST request before a batch of events is split into multiple requests
* @defaultValue 40000
*/
maxPostBytes?: number;
Expand Down Expand Up @@ -274,7 +275,7 @@ export function newEmitter({
try {
const response = await customFetch(fetchRequest);

request.cancelTimeoutTimer();
request.closeRequest(true);

if (response.ok) {
callOnRequestSuccess(payloads, response);
Expand All @@ -293,6 +294,8 @@ export function newEmitter({
return { success: false, retry: willRetry, status: response.status };
}
} catch (e) {
request.closeRequest(false);

const message = typeof e === 'string' ? e : e ? (e as Error).message : 'Unknown error';
callOnRequestFailure({
events: payloads,
Expand All @@ -312,7 +315,6 @@ export function newEmitter({
customHeaders,
connectionTimeout,
keepalive,
bufferSize,
maxPostBytes,
useStm,
credentials,
Expand All @@ -324,7 +326,7 @@ export function newEmitter({
LOG.warn('Event (' + bytes + 'B) too big, max is ' + maxBytes);

if (usePost) {
const bytes = emitterEvent.getPOSTRequestBytesCount();
const bytes = emitterEvent.getPOSTRequestBytesCount() + 88; // 88 bytes for the payload_data envelope
const tooBig = bytes > maxPostBytes;
if (tooBig) {
eventTooBigWarning(bytes, maxPostBytes);
Expand Down
13 changes: 5 additions & 8 deletions libraries/tracker-core/test/emitter/emitter_request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { newEventStorePayload } from '../../src/event_store_payload';

const newEmitterEventFromPayload = (payload: Record<string, unknown>) => {
return newEmitterEvent(newEventStorePayload({ payload }));
}
};

// MARK: - addEvent

Expand Down Expand Up @@ -71,7 +71,7 @@ test('countBytes returns the correct byte count', (t) => {

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'mob' })));
t.is(request.countBytes(), 40);
t.is(request.countBytes(), 40 + 88); // 40 bytes for each event, 88 bytes for the payload_data envelope
});

// MARK: - countEvents
Expand All @@ -89,34 +89,31 @@ test('countEvents returns the correct event count', (t) => {

// MARK: - isFull

test('isFull returns false when not reached buffer size', (t) => {
test('isFull returns false when not reached max post bytes', (t) => {
const request = newEmitterRequest({
endpoint: 'https://example.com',
maxPostBytes: 1000,
bufferSize: 2,
});

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
t.false(request.isFull());
});

test('isFull returns true when reached buffer size', (t) => {
test('isFull returns false when reached buffer size and not max post bytes', (t) => {
const request = newEmitterRequest({
endpoint: 'https://example.com',
maxPostBytes: 1000,
bufferSize: 2,
});

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'mob' })));
t.true(request.isFull());
t.false(request.isFull());
});

test('isFull returns true when reached max post bytes', (t) => {
const request = newEmitterRequest({
endpoint: 'https://example.com',
maxPostBytes: 10,
bufferSize: 2,
});

t.true(request.addEvent(newEmitterEventFromPayload({ e: 'pv', p: 'web' })));
Expand Down
3 changes: 2 additions & 1 deletion libraries/tracker-core/test/emitter/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ test('adds a timeout to the request', async (t) => {

return new Promise((resolve, reject) => {
let timer = setTimeout(() => {
t.fail('Request should have timed out');
resolve(new Response(null, { status: 200 }));
}, 1000);
}, 500);

input.signal?.addEventListener('abort', () => {
clearTimeout(timer);
Expand Down
1 change: 1 addition & 0 deletions plugins/browser-plugin-media-tracking/tests/test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ describe('MediaTrackingPlugin', () => {
},
],
contexts: { webPage: false },
customFetch: async () => new Response(null, { status: 200 }),
});
id = `media-${idx}`;
});
Expand Down
1 change: 1 addition & 0 deletions plugins/browser-plugin-media/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ describe('Media Tracking API', () => {
},
],
contexts: { webPage: false },
customFetch: async () => new Response(null, { status: 200 }),
});
id = `media-${idx}`;
});
Expand Down
18 changes: 11 additions & 7 deletions trackers/node-tracker/test/tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,13 @@ for (const eventMethod of testMethods) {
eventMethod,
bufferSize: 0,
onRequestSuccess: (batch) => {
const expected = batch[0]['e'] === 'tr' ? expectedTransaction : expectedItem;
batch.forEach((payload) => {
const expected = payload['e'] === 'tr' ? expectedTransaction : expectedItem;

checkPayload(batch[0], expected, t);
checkPayload(payload, expected, t);

requestCount--;
requestCount--;
});
if (requestCount === 0) {
resolve(batch);
}
Expand Down Expand Up @@ -478,11 +480,13 @@ for (const eventMethod of testMethods) {
endpoint,
eventMethod,
bufferSize: 0,
onRequestSuccess: ([pl]) => {
checkPayload(pl, expected, t);
count--;
onRequestSuccess: (batch) => {
batch.forEach((pl) => {
checkPayload(pl, expected, t);
count--;
});
if (count === 0) {
resolve(pl);
resolve(batch);
}
},
onRequestFailure: reject,
Expand Down

0 comments on commit 88213c9

Please sign in to comment.