Skip to content

Commit

Permalink
set logger attributes instead of calling log() (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp authored Sep 13, 2024
1 parent f5a24ef commit a5870d7
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 28 deletions.
2 changes: 1 addition & 1 deletion agents/src/ipc/job_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const startJob = (
const task = new Promise<void>(async () => {
const unconnectedTimeout = setTimeout(() => {
if (!(connect || shutdown)) {
log().warn(
logger.warn(
'room not connect after job_entry was called after 10 seconds, ',
'did you forget to call ctx.connect()?',
);
Expand Down
11 changes: 6 additions & 5 deletions agents/src/ipc/proc_job_executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class ProcJobExecutor extends JobExecutor {
#pongTimeout?: ReturnType<typeof setTimeout>;
#init = new Future();
#join = new Future();
#logger = log().child({ runningJob: this.#runningJob });

constructor(agent: string, initializeTimeout: number, closeTimeout: number) {
super();
Expand Down Expand Up @@ -63,7 +64,7 @@ export class ProcJobExecutor extends JobExecutor {
}, this.PING_INTERVAL);

this.#pongTimeout = setTimeout(() => {
log().warn('job is unresponsive');
this.#logger.warn('job is unresponsive');
clearTimeout(this.#pongTimeout);
clearInterval(this.#pingInterval);
this.#proc!.kill();
Expand All @@ -75,13 +76,13 @@ export class ProcJobExecutor extends JobExecutor {
case 'pongResponse': {
const delay = Date.now() - msg.value.timestamp;
if (delay > this.HIGH_PING_THRESHOLD) {
log().child({ delay }).warn('job executor is unresponsive');
this.#logger.child({ delay }).warn('job executor is unresponsive');
}
this.#pongTimeout?.refresh();
break;
}
case 'exiting': {
log().child({ reason: msg.value.reason }).debug('job exiting');
this.#logger.child({ reason: msg.value.reason }).debug('job exiting');
break;
}
case 'done': {
Expand All @@ -95,7 +96,7 @@ export class ProcJobExecutor extends JobExecutor {
this.#proc!.on('message', listener);
this.#proc!.on('error', (err) => {
if (this.#closing) return;
log().child({ err }).warn('job process exited unexpectedly');
this.#logger.child({ err }).warn('job process exited unexpectedly');
clearTimeout(this.#pongTimeout);
clearInterval(this.#pingInterval);
this.#join.resolve();
Expand Down Expand Up @@ -142,7 +143,7 @@ export class ProcJobExecutor extends JobExecutor {
this.#proc!.send({ case: 'shutdownRequest' });

const timer = setTimeout(() => {
log().error('job shutdown is taking too much time');
this.#logger.error('job shutdown is taking too much time');
}, this.#opts.closeTimeout);
await this.#join.await.then(() => {
clearTimeout(timer);
Expand Down
5 changes: 4 additions & 1 deletion agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
RtcConfiguration,
} from '@livekit/rtc-node';
import { RoomEvent, TrackKind } from '@livekit/rtc-node';
import { Logger } from 'pino';
import { log } from './log.js';

/** Which tracks, if any, should the agent automatically subscribe to? */
Expand Down Expand Up @@ -57,6 +58,7 @@ export class JobContext {
result: Promise<void>;
};
} = {};
#logger: Logger;

constructor(
proc: JobProcess,
Expand All @@ -72,6 +74,7 @@ export class JobContext {
this.#onShutdown = onShutdown;
this.onParticipantConnected = this.onParticipantConnected.bind(this);
this.#room.on(RoomEvent.ParticipantConnected, this.onParticipantConnected);
this.#logger = log().child({ info: this.#info });
}

get proc(): JobProcess {
Expand Down Expand Up @@ -154,7 +157,7 @@ export class JobContext {
p.identity in this.#participantTasks &&
this.#participantTasks[p.identity].callback == callback
) {
log().warn(
this.#logger.warn(
'a participant has joined before a prior prticipant task matching the same identity has finished:',
p.identity,
);
Expand Down
10 changes: 4 additions & 6 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ export class Worker {
switch (msg.message.case) {
case 'register': {
this.#id = msg.message.value.workerId;
log()
this.#logger
.child({ id: this.id, server_info: msg.message.value.serverInfo })
.info('registered worker');
this.event.emit(
Expand All @@ -431,9 +431,7 @@ export class Worker {
delete this.#pending[job.id];
task.resolve(msg.message.value);
} else {
log()
.child({ job })
.warn('received assignment for unknown job ' + job.id);
this.#logger.child({ job }).warn('received assignment for unknown job ' + job.id);
}
break;
}
Expand Down Expand Up @@ -480,9 +478,9 @@ export class Worker {
if (oldStatus != currentStatus) {
const extra = { load: currentLoad, loadThreshold: this.#opts.loadThreshold };
if (isFull) {
log().child(extra).info('worker is at full capacity, marking as unavailable');
this.#logger.child(extra).info('worker is at full capacity, marking as unavailable');
} else {
log().child(extra).info('worker is below capacity, marking as available');
this.#logger.child(extra).info('worker is below capacity, marking as available');
}
}

Expand Down
30 changes: 15 additions & 15 deletions plugins/openai/src/voice_assistant/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
// import { log } from '@livekit/agents';
import { AudioByteStream } from '@livekit/agents';
import { findMicroTrackId } from '@livekit/agents';
import { log } from '@livekit/agents';
Expand Down Expand Up @@ -69,11 +68,12 @@ export class VoiceAssistant {
private localTrackSid: string | null = null;
private localSource: AudioSource | null = null;
private pendingMessages: Map<string, string> = new Map();
private logger = log();

start(room: Room, participant: RemoteParticipant | string | null = null): Promise<void> {
return new Promise(async (resolve, reject) => {
if (this.ws !== null) {
log().warn('VoiceAssistant already started');
this.logger.warn('VoiceAssistant already started');
resolve();
return;
}
Expand Down Expand Up @@ -115,7 +115,7 @@ export class VoiceAssistant {
options.source = TrackSource.SOURCE_MICROPHONE;
this.agentPublication = (await room.localParticipant?.publishTrack(track, options)) || null;
if (!this.agentPublication) {
log().error('Failed to publish track');
this.logger.error('Failed to publish track');
reject(new Error('Failed to publish track'));
return;
}
Expand Down Expand Up @@ -153,15 +153,15 @@ export class VoiceAssistant {

private sendClientCommand(command: Record<string, unknown>): void {
if (!this.connected || !this.ws) {
log().error('WebSocket is not connected');
this.logger.error('WebSocket is not connected');
return;
}

if (command.event !== proto.ClientEvent.ADD_USER_AUDIO) {
const truncatedDataPartial = command['data']
? { data: (command['data'] as string).slice(0, 30) + '…' }
: {};
log().debug(`-> ${JSON.stringify({ ...command, ...truncatedDataPartial })}`);
this.logger.debug(`-> ${JSON.stringify({ ...command, ...truncatedDataPartial })}`);
}
this.ws.send(JSON.stringify(command));
}
Expand All @@ -170,7 +170,7 @@ export class VoiceAssistant {
const truncatedDataPartial = event['data']
? { data: (event['data'] as string).slice(0, 30) + '…' }
: {};
log().debug(`<- ${JSON.stringify({ ...event, ...truncatedDataPartial })}`);
this.logger.debug(`<- ${JSON.stringify({ ...event, ...truncatedDataPartial })}`);
switch (event.event) {
case proto.ServerEvent.START_SESSION:
break;
Expand All @@ -194,7 +194,7 @@ export class VoiceAssistant {
this.handleInputTranscribed(event);
break;
default:
log().warn(`Unknown server event: ${JSON.stringify(event)}`);
this.logger.warn(`Unknown server event: ${JSON.stringify(event)}`);
}
}

Expand Down Expand Up @@ -232,7 +232,7 @@ export class VoiceAssistant {
if (participantIdentity && trackSid) {
this.publishTranscription(participantIdentity, trackSid, newText, false, itemId);
} else {
log().error('Participant or track not set');
this.logger.error('Participant or track not set');
}
}
break;
Expand All @@ -259,7 +259,7 @@ export class VoiceAssistant {
if (participantIdentity && trackSid) {
this.publishTranscription(participantIdentity, trackSid, text, true, itemId);
} else {
log().error('Participant or track not set');
this.logger.error('Participant or track not set');
}
}
}
Expand All @@ -268,15 +268,15 @@ export class VoiceAssistant {
const itemId = event.item_id as string;
const transcription = event.transcript as string;
if (!itemId || !transcription) {
log().error('Item ID or transcription not set');
this.logger.error('Item ID or transcription not set');
return;
}
const participantIdentity = this.linkedParticipant?.identity;
const trackSid = this.subscribedTrack?.sid;
if (participantIdentity && trackSid) {
this.publishTranscription(participantIdentity, trackSid, transcription, true, itemId);
} else {
log().error('Participant or track not set');
this.logger.error('Participant or track not set');
}
}

Expand All @@ -287,19 +287,19 @@ export class VoiceAssistant {
if (participantIdentity && trackSid && itemId) {
this.publishTranscription(participantIdentity, trackSid, '', false, itemId);
} else {
log().error('Participant or track or itemId not set');
this.logger.error('Participant or track or itemId not set');
}
}

private linkParticipant(participantIdentity: string): void {
if (!this.room) {
log().error('Room is not set');
this.logger.error('Room is not set');
return;
}

this.linkedParticipant = this.room.remoteParticipants.get(participantIdentity) || null;
if (!this.linkedParticipant) {
log().error(`Participant with identity ${participantIdentity} not found`);
this.logger.error(`Participant with identity ${participantIdentity} not found`);
return;
}
this.subscribeToMicrophone();
Expand All @@ -325,7 +325,7 @@ export class VoiceAssistant {
};

if (!this.linkedParticipant) {
log().error('Participant is not set');
this.logger.error('Participant is not set');
return;
}

Expand Down

0 comments on commit a5870d7

Please sign in to comment.