Skip to content

Commit

Permalink
fix: delayed thread group registration
Browse files Browse the repository at this point in the history
  • Loading branch information
noomorph committed Apr 29, 2024
1 parent eb587c3 commit f7a5254
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 55 deletions.
4 changes: 2 additions & 2 deletions src/decorator/Bunyamin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import type { ThreadGroupConfig } from '../streams';
import type { ThreadID } from '../types';
import { flow, isActionable, isError, isObject, isPromiseLike } from '../utils';
import type {
BunyaminLogMethod,
BunyaminConfig,
BunyaminLogMethod,
BunyaminLogRecordFields as UserFields,
BunyanLikeLogger,
BunyanLogLevel,
Expand Down Expand Up @@ -48,7 +48,7 @@ export class Bunyamin<Logger extends BunyanLikeLogger = BunyanLikeLogger> {

/** @deprecated */

Check warning on line 49 in src/decorator/Bunyamin.ts

View workflow job for this annotation

GitHub Actions / Lint

Missing JSDoc @returns declaration
get threadGroups(): ThreadGroupConfig[] {
return [];
return [...(this.#shared.threadGroups ?? [])];
}

get logger(): Logger {
Expand Down
2 changes: 1 addition & 1 deletion src/decorator/types/BunyaminConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export type BunyaminConfig<Logger extends BunyanLikeLogger> = {
/**
* Thread groups to be used for grouping log records.
*/
threadGroups?: ThreadGroupConfig[];
threadGroups?: Iterable<ThreadGroupConfig>;
/**
* Fallback message to be used when there was no previous message
* passed with {@link BunyaminLogMethod#begin}.
Expand Down
16 changes: 13 additions & 3 deletions src/realm.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable prefer-const */
import { Bunyamin } from './decorator';
import { noopLogger } from './noopLogger';
import { isSelfDebug } from './is-debug';
Expand All @@ -10,13 +11,22 @@ type Realm = {
};

function create() {
let bunyamin: Bunyamin;
let nobunyamin: Bunyamin;

const selfDebug = isSelfDebug();
const bunyamin = new Bunyamin({ logger: noopLogger() });
const nobunyamin = new Bunyamin({
const threadGroups = new ThreadGroups(() => bunyamin);

bunyamin = new Bunyamin({
logger: noopLogger(),
threadGroups,
});

nobunyamin = new Bunyamin({
immutable: true,
logger: noopLogger(),
threadGroups,
});
const threadGroups = new ThreadGroups(bunyamin);

if (selfDebug) {
bunyamin.trace({ cat: 'bunyamin' }, 'bunyamin global instance created');
Expand Down
6 changes: 2 additions & 4 deletions src/streams/bunyan-trace-event/BunyanTraceEventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ export class BunyanTraceEventStream extends Transform {
strict: options.strict ?? false,
defaultThreadName: options.defaultThreadName ?? 'Main Thread',
maxConcurrency: options.maxConcurrency ?? 100,
// Lazy to add a `NormalizedOptions...` type, so we just cast it here.
threadGroups: options.threadGroups as Iterable<ThreadGroupConfig>,
});

for (const threadGroup of options.threadGroups) {
this.#threadGroupDispatcher.registerThreadGroup(threadGroup as ThreadGroupConfig);
}
}

_transform(
Expand Down
20 changes: 11 additions & 9 deletions src/streams/bunyan-trace-event/options/normalizeOptions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { TraceEventStreamOptions } from './TraceEventStreamOptions';
import type { ThreadGroupConfig } from '../threads';
import type { TraceEventStreamOptions } from './TraceEventStreamOptions';

export function normalizeOptions(
options: TraceEventStreamOptions,
Expand All @@ -8,14 +8,16 @@ export function normalizeOptions(
options.defaultThreadName = options.defaultThreadName ?? 'Main Thread';
options.maxConcurrency = options.maxConcurrency ?? 100;
options.strict = options.strict ?? false;
options.threadGroups = [...(options.threadGroups ?? [])].map((threadGroup, index) =>
typeof threadGroup === 'string'
? {
id: threadGroup,
displayName: threadGroup,
}
: validateThreadGroup(threadGroup, index),
);
options.threadGroups = Array.isArray(options.threadGroups)
? options.threadGroups.map((threadGroup, index) =>
typeof threadGroup === 'string'
? {
id: threadGroup,
displayName: threadGroup,
}
: validateThreadGroup(threadGroup, index),
)
: options.threadGroups ?? [];

if (options.maxConcurrency < 1) {
throw new Error(`maxConcurrency must be at least 1, got ${options.maxConcurrency}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ describe('ThreadGroupDispatcher', () => {
defaultThreadName: 'Main Thread',
maxConcurrency: 100,
strict: false,
})
.registerThreadGroup({ id: 'foo', displayName: 'A' })
.registerThreadGroup({ id: 'bar', displayName: 'B', maxConcurrency: 2 })
.registerThreadGroup({ id: 'baz', displayName: 'C', maxConcurrency: 3 });
threadGroups: [
{ id: 'foo', displayName: 'A' },
{ id: 'bar', displayName: 'B', maxConcurrency: 2 },
{ id: 'baz', displayName: 'C', maxConcurrency: 3 },
],
});
});

it.each(PHASES)('should fallback to 0 for null tid (ph = %j)', (ph) => {
Expand Down Expand Up @@ -74,9 +76,12 @@ describe('ThreadGroupDispatcher', () => {
defaultThreadName: 'Main Thread',
maxConcurrency: 2,
strict: true,
}).registerThreadGroup({
id: 'foo',
displayName: 'A',
threadGroups: [
{
id: 'foo',
displayName: 'A',
},
],
});
});

Expand Down Expand Up @@ -106,9 +111,12 @@ describe('ThreadGroupDispatcher', () => {
defaultThreadName: 'Main Thread',
maxConcurrency: 1,
strict: true,
}).registerThreadGroup({
id: 'foo',
displayName: 'Single Thread',
threadGroups: [
{
id: 'foo',
displayName: 'Single Thread',
},
],
});

expect(dispatcher.resolve('B', 'foo')).toBe(1);
Expand All @@ -127,9 +135,12 @@ describe('ThreadGroupDispatcher', () => {
defaultThreadName: 'Main Thread',
maxConcurrency: 2,
strict: false,
}).registerThreadGroup({
id: 'foo',
displayName: 'A',
threadGroups: [
{
id: 'foo',
displayName: 'A',
},
],
});
});

Expand Down
48 changes: 33 additions & 15 deletions src/streams/bunyan-trace-event/threads/ThreadGroupDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,30 @@ export type ThreadGroupDispatcherConfig = {
defaultThreadName: string;
maxConcurrency: number;
strict: boolean;
threadGroups: Iterable<ThreadGroupConfig>;
};

export class ThreadGroupDispatcher {
readonly #strict: boolean;
readonly #dispatchers: Record<string, ThreadDispatcher> = {};
readonly #maxConcurrency: number;
readonly #defaultThreadName: string;
readonly #threadGroups: Iterable<ThreadGroupConfig>;
readonly #names: IntervalTree = new IntervalTree();

#freeThreadId = 1;
#initialized = false;

constructor(options: ThreadGroupDispatcherConfig) {
this.#defaultThreadName = options.defaultThreadName;
this.#maxConcurrency = options.maxConcurrency;
this.#strict = options.strict;
}

registerThreadGroup(config: ThreadGroupConfig): this {
const maxConcurrency = config.maxConcurrency ?? this.#maxConcurrency;
const min = this.#freeThreadId;
const max = min + maxConcurrency - 1;

this.#dispatchers[config.id] = new ThreadDispatcher(config.displayName, this.#strict, min, max);
this.#names.insert([min, max], config.displayName);
this.#freeThreadId = max + 1;

return this;
this.#threadGroups = options.threadGroups;
}

name(tid: number): string | undefined {
this.#ensureInitialized();

if (tid === 0) {
return this.#defaultThreadName;
}
Expand All @@ -46,6 +40,8 @@ export class ThreadGroupDispatcher {
}

resolve(ph: string | undefined, tid: ThreadID | undefined): number | Error {
this.#ensureInitialized();

if (tid == null) {
return 0;
}
Expand Down Expand Up @@ -74,7 +70,29 @@ export class ThreadGroupDispatcher {
}
}

#resolveDispatcher(threadAlias: ThreadAlias): ThreadDispatcher {
#ensureInitialized() {
if (!this.#initialized) {
this.#initialized = true;

for (const group of this.#threadGroups) {
this.#registerThreadGroup(group);
}
}
}

#registerThreadGroup(config: ThreadGroupConfig): this {
const maxConcurrency = config.maxConcurrency ?? this.#maxConcurrency;
const min = this.#freeThreadId;
const max = min + maxConcurrency - 1;

this.#dispatchers[config.id] = new ThreadDispatcher(config.displayName, this.#strict, min, max);
this.#names.insert([min, max], config.displayName);
this.#freeThreadId = max + 1;

return this;
}

#resolveDispatcher(threadAlias: ThreadAlias): ThreadDispatcher | undefined {
const groupName = typeof threadAlias === 'string' ? threadAlias : threadAlias[0];
return this.#ensureGroupDispatcher(groupName);
}
Expand All @@ -89,9 +107,9 @@ export class ThreadGroupDispatcher {
: threadAlias[1];
}

#ensureGroupDispatcher(threadGroup: string): ThreadDispatcher {
#ensureGroupDispatcher(threadGroup: string): ThreadDispatcher | undefined {
if (!this.#dispatchers[threadGroup] && !this.#strict) {
this.registerThreadGroup({ id: threadGroup, displayName: threadGroup });
this.#registerThreadGroup({ id: threadGroup, displayName: threadGroup });
}

return this.#dispatchers[threadGroup];
Expand Down
7 changes: 4 additions & 3 deletions src/thread-groups/ThreadGroups.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/* eslint-disable @typescript-eslint/consistent-type-imports */
import { beforeEach, describe, expect, jest, it } from '@jest/globals';
import type { ThreadGroups } from './ThreadGroups';
import { wrapLogger } from '../wrapLogger';
import type { Bunyamin } from '../decorator';

describe('ThreadGroups', () => {
let ThreadGroups: new (logger: Bunyamin) => ThreadGroups;
let ThreadGroups: typeof import('./ThreadGroups').ThreadGroups;
let threadGroups: ThreadGroups;
let isDebug: jest.Mocked<any>;
let logger: Bunyamin;
Expand All @@ -26,7 +27,7 @@ describe('ThreadGroups', () => {
describe('in regular mode', () => {
beforeEach(() => {
isDebug.isSelfDebug.mockReturnValue(false);
threadGroups = new ThreadGroups(logger);
threadGroups = new ThreadGroups(() => logger);
});

it('should be empty by default', () => {
Expand All @@ -47,7 +48,7 @@ describe('ThreadGroups', () => {
describe('in debug mode', () => {
beforeEach(() => {
isDebug.isSelfDebug.mockReturnValue(true);
threadGroups = new ThreadGroups(logger);
threadGroups = new ThreadGroups(() => logger);
});

it('should call logger.trace upon addition', () => {
Expand Down
10 changes: 5 additions & 5 deletions src/thread-groups/ThreadGroups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import type { ThreadGroupConfig } from '../streams';
import { isSelfDebug } from '../is-debug';
import { StackTraceError } from '../decorator/StackTraceError';

export class ThreadGroups {
readonly #bunyamin: Bunyamin;
export class ThreadGroups implements Iterable<ThreadGroupConfig> {
readonly #debugMode = isSelfDebug();
readonly #getBunyamin: () => Bunyamin;
readonly #groups = new Map<string, ThreadGroupConfig>();

constructor(bunyamin: Bunyamin) {
this.#bunyamin = bunyamin;
constructor(getBunyamin: () => Bunyamin) {
this.#getBunyamin = getBunyamin;
this.#groups = new Map();
}

Expand All @@ -32,7 +32,7 @@ export class ThreadGroups {

#logAddition(group: ThreadGroupConfig, action: string) {
const { stack } = new StackTraceError();
this.#bunyamin.trace(
this.#getBunyamin().trace(
{ cat: 'bunyamin' },
`thread group ${action}: ${group.id} (${group.displayName})\n\n${stack}`,
);
Expand Down

0 comments on commit f7a5254

Please sign in to comment.