Unable to consistently save the conversation threads on messageDone when using AssistantResponse. #3234
Unanswered
gablabelle
asked this question in
Help
Replies: 1 comment
-
OK so as a solution I had to implement my own modified version of AssistantResponse. Basically I'm exposing a import { AssistantMessage, DataMessage, formatStreamPart } from "ai";
import type { AssistantStream, OpenAI, Run } from "@repo/openai";
/**
You can pass the thread and the latest message into the `AssistantResponse`. This establishes the context for the response.
*/
type AssistantResponseSettings = {
/**
The thread ID that the response is associated with.
*/
threadId: string;
/**
The ID of the latest message that the response is associated with.
*/
messageId: string;
};
/**
The process parameter is a callback in which you can run the assistant on threads, and send messages and data messages to the client.
*/
type AssistantResponseCallback = (options: {
/**
@deprecated use variable from outer scope instead.
*/
threadId: string;
/**
@deprecated use variable from outer scope instead.
*/
messageId: string;
/**
Forwards an assistant message (non-streaming) to the client.
*/
sendMessage: (message: AssistantMessage) => void;
/**
Send a data message to the client. You can use this to provide information for rendering custom UIs while the assistant is processing the thread.
*/
sendDataMessage: (message: DataMessage) => void;
/**
Forwards the assistant response stream to the client. Returns the `Run` object after it completes, or when it requires an action.
*/
forwardStream: (stream: AssistantStream) => Promise<Run | undefined>;
onAnnotation: (
fn: (
index: number,
annotations: MessageAnnotation
) => Promise<{ newText: string }>
) => void;
onRunStreamCompleted: (
fn: (message: OpenAI.Beta.Threads.Messages.Message) => Promise<void>
) => void;
}) => Promise<void>;
export interface MessageAnnotation {
text: string;
file_id: string;
start_index: number;
end_index: number;
}
const referenceRegex = /【[^】]*】/g;
/**
The `AssistantResponse` allows you to send a stream of assistant update to `useAssistant`.
It is designed to facilitate streaming assistant responses to the `useAssistant` hook.
It receives an assistant thread and a current message, and can send messages and data messages to the client.
*/
export function AssistantResponseWithAnnotations(
{ threadId, messageId }: AssistantResponseSettings,
process: AssistantResponseCallback,
openAiClient: OpenAI
): Response {
let annotationIndex = 0;
let messageCompleted: OpenAI.Beta.Threads.Messages.Message | undefined;
let citedFiles: { [fileId: string]: OpenAI.Files.FileObject } = {};
const processedFiles = new Set<string>();
// Function to process annotations
const processAnnotations = async (
text: string,
annotations: any[],
annotationCallbackFn:
| ((
index: number,
annotation: MessageAnnotation
) => Promise<{
newText: string;
}>)
| undefined
): Promise<string> => {
let processedText = text;
// Sort annotations in reverse order (end to start)
annotations.sort((a, b) => b.start_index - a.start_index);
for (const annotation of annotations) {
if (
annotation.type === "file_citation" &&
annotation.file_citation?.file_id
) {
const fileId = annotation.file_citation.file_id;
// if (processedFiles.has(fileId)) {
// // Remove duplicate annotations
// processedText =
// processedText.slice(0, annotation.start_index) +
// processedText.slice(annotation.end_index);
// continue;
// }
const citedFile = citedFiles[fileId];
if (citedFile && annotationCallbackFn) {
const { newText } = await annotationCallbackFn(annotationIndex, {
text: citedFile.filename,
file_id: citedFile.id,
start_index: annotation.start_index,
end_index: annotation.end_index,
});
// Replace the original annotation marker with the new reference format
processedText =
processedText.slice(0, annotation.start_index) +
newText +
processedText.slice(annotation.end_index);
annotationIndex++;
// Mark this file as processed
processedFiles.add(fileId);
}
}
}
// Remove any remaining annotation markers
processedText = processedText.replace(/【[^】]*】/g, "");
return processedText;
};
const stream = new ReadableStream({
async start(controller) {
const textEncoder = new TextEncoder();
const sendMessage = (message: AssistantMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart("assistant_message", message))
);
};
const sendDataMessage = (message: DataMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart("data_message", message))
);
};
const sendError = (errorMessage: string) => {
controller.enqueue(
textEncoder.encode(formatStreamPart("error", errorMessage))
);
};
let annotationCallbackFn:
| undefined
| ((
index: number,
annotations: MessageAnnotation
) => Promise<{
newText: string;
}>);
let onRunStreamCallbackFn:
| ((message: OpenAI.Beta.Threads.Messages.Message) => Promise<void>)
| undefined;
const forwardStream = async (stream: AssistantStream) => {
let result: Run | undefined = undefined;
let fullMessage = "";
let lastSentLength = 0;
for await (const value of stream) {
switch (value.event) {
case "thread.message.created": {
controller.enqueue(
textEncoder.encode(
formatStreamPart("assistant_message", {
id: value.data.id,
role: "assistant",
content: [{ type: "text", text: { value: "" } }],
})
)
);
break;
}
case "thread.message.delta": {
const content = value.data.delta.content?.[0];
if (
content &&
content.type == "text" &&
content.text &&
content.text.value
) {
let text = content.text.value;
text = text.replace(referenceRegex, "");
if (
content.text.annotations &&
Array.isArray(content.text.annotations)
) {
// Populate citedFiles object
for (const annotation of content.text.annotations) {
if (
annotation.type === "file_citation" &&
annotation.file_citation?.file_id &&
!citedFiles[annotation.file_citation.file_id]
) {
try {
const file = await openAiClient.files.retrieve(
annotation.file_citation.file_id
);
citedFiles[annotation.file_citation.file_id] = file;
} catch (error) {
console.error("Error retrieving file:", error);
}
}
}
text = await processAnnotations(
text,
content.text.annotations,
annotationCallbackFn
);
}
fullMessage += text;
// Send only the new part of the message
const newContent = fullMessage.slice(lastSentLength);
if (newContent) {
controller.enqueue(
textEncoder.encode(formatStreamPart("text", newContent))
);
lastSentLength = fullMessage.length;
}
}
break;
}
case "thread.message.completed": {
// Store value.data.content so it can be used later
messageCompleted = value.data;
break;
}
case "thread.run.completed":
case "thread.run.requires_action": {
result = value.data;
break;
}
}
}
return result;
};
// send the threadId and messageId as the first message:
controller.enqueue(
textEncoder.encode(
formatStreamPart("assistant_control_data", {
threadId,
messageId,
})
)
);
try {
await process({
threadId,
messageId,
sendMessage,
sendDataMessage,
forwardStream,
onAnnotation: (fn) => {
annotationCallbackFn = fn;
},
onRunStreamCompleted: (fn) => {
onRunStreamCallbackFn = fn;
},
});
} catch (error) {
sendError((error as any).message ?? `${error}`);
} finally {
if (onRunStreamCallbackFn && messageCompleted) {
try {
// Apply annotations to messageCompleted before passing it to onRunStreamCallbackFn
if (messageCompleted.content[0].type === "text") {
let fullMessage = messageCompleted.content[0].text.value;
const annotations =
messageCompleted.content[0].text.annotations || [];
fullMessage = await processAnnotations(
fullMessage,
annotations,
annotationCallbackFn
);
messageCompleted.content[0].text.value = fullMessage;
}
await onRunStreamCallbackFn(messageCompleted);
} catch (callbackError) {
if (callbackError instanceof Error) {
console.error("Error in onRunStreamCallbackFn:", callbackError);
sendError(
`Error in onRunStreamCallbackFn: ${callbackError.message}`
);
} else {
sendError(`Error in onRunStreamCallbackFn: ${callbackError}`);
}
}
}
// Close the stream after ensuring the callback has been invoked
controller.close();
}
},
pull(controller) {},
cancel() {},
});
return new Response(stream, {
status: 200,
headers: {
"Content-Type": "text/plain; charset=utf-8",
},
});
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hello,
When using AssistantResponse from a Next.js API route handler, the API call ends before it could properly save the conversation.
Since they are non blocking operations, whether I try to trigger
cacheAssistantsThread
in themessageDone
callback or afterfinalMessages
has run, sometimes it has the time to run and sometimes it doesn't.Any suggestions on what I could do to ensure
cacheAssistantsThread
has finished running before the API call ends?I'd like to keep the streaming if possible since it greatly improves the user experience.
And here is what
cacheAssistantsThread
does:Beta Was this translation helpful? Give feedback.
All reactions