Skip to content

Commit

Permalink
add pid/jobid to child and fix Future (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp authored Sep 12, 2024
1 parent fe6c1dd commit 9e3f019
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
16 changes: 12 additions & 4 deletions agents/src/ipc/job_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Room, RoomEvent } from '@livekit/rtc-node';
import type { ChildProcess } from 'child_process';
import { fork } from 'child_process';
import { EventEmitter, once } from 'events';
import { Logger } from 'pino';
import { fileURLToPath } from 'url';
import type { Agent } from '../generator.js';
import type { RunningJobInfo } from '../job.js';
Expand Down Expand Up @@ -33,6 +34,7 @@ const startJob = (
func: (ctx: JobContext) => Promise<void>,
info: RunningJobInfo,
closeEvent: EventEmitter,
logger: Logger,
): JobTask => {
let connect = false;
let shutdown = false;
Expand Down Expand Up @@ -64,16 +66,18 @@ const startJob = (
func(ctx).finally(() => clearTimeout(unconnectedTimeout));

await once(closeEvent, 'close').then((close) => {
logger.debug('shutting down');
process.send!({ case: 'exiting', reason: close[1] });
});

await room.disconnect();
logger.debug('disconnected from room');

const shutdownTasks = [];
for (const callback of ctx.shutdownCallbacks) {
shutdownTasks.push(callback());
}
await Promise.all(shutdownTasks).catch(() => log().error('error while shutting down the job'));
await Promise.all(shutdownTasks).catch(() => logger.error('error while shutting down the job'));

process.send!({ case: 'done' });
process.exit();
Expand Down Expand Up @@ -103,10 +107,11 @@ if (process.send) {
initializeLogger(msg.value.loggerOptions);
});
const proc = new JobProcess();
let logger = log().child({ pid: proc.pid });

log().child({ pid: proc.pid }).debug('initializing job runner');
logger.debug('initializing job runner');
agent.prewarm(proc);
log().child({ pid: proc.pid }).debug('job runner initialized');
logger.debug('job runner initialized');
process.send({ case: 'initializeResponse' });

let job: JobTask | undefined = undefined;
Expand All @@ -125,7 +130,10 @@ if (process.send) {
throw new Error('job task already running');
}

job = startJob(proc, agent.entry, msg.value.runningJob, closeEvent);
logger = logger.child({ jobID: msg.value.runningJob.job.id });

job = startJob(proc, agent.entry, msg.value.runningJob, closeEvent, logger);
logger.debug('job started');
break;
}
case 'shutdownRequest': {
Expand Down
8 changes: 4 additions & 4 deletions agents/src/ipc/proc_job_executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class ProcJobExecutor extends JobExecutor {
}

async run() {
await this.#init;
await this.#init.await;

this.#pingInterval = setInterval(() => {
this.#proc!.send({ case: 'pingRequest', value: { timestamp: Date.now() } });
Expand Down Expand Up @@ -88,15 +88,15 @@ export class ProcJobExecutor extends JobExecutor {
};
this.#proc!.on('message', listener);

await this.#join;
await this.#join.await;
}

async join() {
if (!this.#started) {
throw new Error('runner not started');
}

await this.#join;
await this.#join.await;
}

async initialize() {
Expand Down Expand Up @@ -125,7 +125,7 @@ export class ProcJobExecutor extends JobExecutor {
const timer = setTimeout(() => {
log().error('job shutdown is taking too much time');
}, this.#opts.closeTimeout);
await this.#join.then(() => {
await this.#join.await.then(() => {
clearTimeout(timer);
clearTimeout(this.#pongTimeout);
clearInterval(this.#pingInterval);
Expand Down
15 changes: 8 additions & 7 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ export class Queue<T> {
}

/** @internal */
export class Future extends Promise<void> {
constructor() {
super((resolve, reject: (_: Error) => void) => {
this.resolve = resolve;
this.reject = reject;
});
export class Future {
#await = new Promise<void>((resolve, reject: (_: Error) => void) => {
this.resolve = resolve;
this.reject = reject;
});

get await() {
return this.#await;
}

resolve = () => {};
reject = (_: Error) => {
_;
Expand Down
4 changes: 2 additions & 2 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ export class Worker {

async close() {
if (this.#closed) {
await this.#close;
await this.#close.await;
return;
}

Expand All @@ -561,6 +561,6 @@ export class Worker {
await Promise.allSettled(this.#tasks);

this.#session?.close();
await this.#close;
await this.#close.await;
}
}

0 comments on commit 9e3f019

Please sign in to comment.