Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new eventStream API for async generators #336

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

sergiodxa
Copy link
Owner

@sergiodxa sergiodxa commented Apr 10, 2024

The eventStream response helpers is based a lot on how useEffect works, you pass a function and that function returns another one that's used to cleanup any side-effect.

This is great for event emitters like API where you have a function used to subscribe to events and another to unsubscribe, e.g.

return eventStream(request.signal, send => {
  emitter.addEventListener("event", handler)
  return () => emitter.removeEventListener("event", handler);

  function handler(event: Event) {
    send({ data: event });
  }
}

Or another common API is for cases where you get the unsubscribe function, e.g.

return eventStream(request.signal, (send) =>
  emitter.subscribe((event) => {
    send({ data: event });
  }),
);

Both are great, but if you want to iterate an AsyncIterator, you will need to create a function just to have the async code inside it.

return eventStream(request.signal, (send) => {
  run()
  return () => {}; // empty cleanup

  async function run() {
    for await (const chunk of stream) {
      if (request.signal.aborted) return;
      send({ data: chunk });
    }
  }
});

This PR introduce an alternative API that will help with this my splitting the handle and cleanup functions.

let stream = new Stream(); // get your stream here

return eventStream(request.signal, {
  // This function will be called when the connection is closed, it can be an
  // async function or a sync function, depends on your needs.
  cleanup() {
    return stream.close()
  },

  // This function will be called when the connection is open, here you can send
  // data to the browser or close the connection
  async handle(send, close) {
    // iterate the async stream of data
    for await (const chunk of stream) {
      // stop the iteration if the connection is closed
      if (request.signal.aborted) return;
      send({ data: chunk }); // send each chunk
    }

    return close(); // close the connection when the stream ends
  }
});

A more real-world example usage for this is using the OpenAI SDK.

import { client } from "~/services/openai.server";

export async function loader({ request }: LoaderFunctionArgs) {
  let stream = await client.chat.completions.create({
    model: "gpt-4",
    messages: [{ role: "user", content: "Say this is a test" }],
    stream: true,
  });

  return eventStream(controller.signal, {
    cleanup() {
      stream.controller.abort();
    },
    async handle(send, close) {
      for await (const chunk of stream) {
        send({ data: chunk.choices[0]?.delta?.content || "" });
      }

      return close();
    },
  });
}

@sergiodxa sergiodxa added the enhancement New feature or request label Apr 10, 2024
@sergiodxa sergiodxa self-assigned this Apr 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant