Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Set terminate Option for user #985

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/generators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ std::string CurrentModulePath() {
}
#endif

void ThrowErrorIfSessionTerminated(bool is_session_terminated) {
if (is_session_terminated)
throw std::runtime_error("Session in Terminated state, exiting!");
ajindal1 marked this conversation as resolved.
Show resolved Hide resolved
}

namespace Generators {

#if USE_CUDA
Expand Down Expand Up @@ -284,6 +289,7 @@ Generator::Generator(const Model& model, const GeneratorParams& params) : model_
}

void Generator::ComputeLogits() {
ThrowErrorIfSessionTerminated(state_->session_terminated);
if (computed_logits_)
throw std::runtime_error("ComputeLogits called again without calling GenerateNextToken first");

Expand All @@ -302,6 +308,7 @@ void Generator::ComputeLogits() {
}

bool Generator::IsDone() const {
ThrowErrorIfSessionTerminated(state_->session_terminated);
if (computed_logits_)
throw std::runtime_error("IsDone() can't be called in the middle of processing logits");

Expand All @@ -313,7 +320,12 @@ bool Generator::IsDone() const {
return is_done;
}

bool Generator::IsSessionTerminated() const {
return state_->session_terminated;
}

void Generator::GenerateNextToken() {
ThrowErrorIfSessionTerminated(state_->session_terminated);
if (!computed_logits_)
throw std::runtime_error("Must call ComputeLogits before GenerateNextToken");
computed_logits_ = false;
Expand Down
3 changes: 3 additions & 0 deletions src/generators.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ using cudaStream_t = void*;
#include "logging.h"
#include "tensor.h"

void ThrowErrorIfSessionTerminated(bool is_session_terminated);

namespace Generators {
struct Model;
struct State;
Expand Down Expand Up @@ -107,6 +109,7 @@ struct Generator : LeakChecked<Generator> {
Generator(const Model& model, const GeneratorParams& params);

bool IsDone() const;
bool IsSessionTerminated() const;
void ComputeLogits();
void GenerateNextToken();

Expand Down
12 changes: 12 additions & 0 deletions src/models/model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,18 @@ void State::Run(OrtSession& session, int new_batch_size) {
}
Copy link
Member

@RyanUnderhill RyanUnderhill Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In session.Run(...) above on line 58, we don't do anything on termination?

The issue is that there's a potential error race. If the user calls 'SetTerminate()' and then a non terminate error was thrown from session.Run() the user will not know it was a non termination related error (unless they look at the string).

Can we detect termination from session.Run? If so, we should rename session_terminated to session_terminate_set and have a second variable for "session_terminated" for if we actually hit the termination case (or if we call ThrowErrorIfSessionTerminated, as that will catch the non session.Run terminate cases).

This way we separate requesting termination from hitting termination, and IsTerminated() should only return true if we hit termination. This does mean that IsTerminated() will return false after SetTerminate() is called, and won't return true until a function is called that checks for termination. This could be a problem.

@baijumeswani can review my thinking also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's an interesting question. I believe the condition you are mentioning is rare, if the session is already terminated then it might be possible that the error might be different because of it being in terminated state. Also, if the error occurs because of some other reason, it should be produced in another run when the processing happens in non-terminated state and can be caught at that time.

}

void State::SetTerminate() {
session_terminated = true;
run_options_->SetTerminate();
}

void State::UnsetTerminate() {
session_terminated = false;
run_options_->UnsetTerminate();
}

OrtValue* State::GetInput(const char* name) {
ThrowErrorIfSessionTerminated(session_terminated);
for (size_t i = 0; i < input_names_.size(); i++) {
if (std::strcmp(input_names_[i], name) == 0) {
return inputs_[i];
Expand All @@ -75,6 +86,7 @@ OrtValue* State::GetInput(const char* name) {
}

OrtValue* State::GetOutput(const char* name) {
ThrowErrorIfSessionTerminated(session_terminated);
for (size_t i = 0; i < output_names_.size(); i++) {
if (std::strcmp(output_names_[i], name) == 0) {
return outputs_[i];
Expand Down
3 changes: 3 additions & 0 deletions src/models/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ struct State {
virtual const CapturedGraphInfo* GetCapturedGraphInfo() const { return nullptr; }
virtual void Finalize() {}

void SetTerminate();
void UnsetTerminate();
mutable bool session_terminated{};
OrtValue* GetInput(const char* name);

virtual OrtValue* GetOutput(const char* name);
Expand Down
12 changes: 12 additions & 0 deletions src/ort_genai.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ struct OgaGenerator : OgaAbstract {
return OgaGenerator_IsDone(this);
}

bool IsSessionTerminated() const {
return OgaGenerator_IsSessionTerminated(this);
}

void ComputeLogits() {
OgaCheckResult(OgaGenerator_ComputeLogits(this));
}
Expand All @@ -244,6 +248,14 @@ struct OgaGenerator : OgaAbstract {
OgaCheckResult(OgaGenerator_GenerateNextToken(this));
}

void SetTerminate() {
OgaCheckResult(OgaGenerator_SetTerminate(this));
}

void UnsetTerminate() {
OgaCheckResult(OgaGenerator_UnsetTerminate(this));
}

size_t GetSequenceCount(size_t index) const {
return OgaGenerator_GetSequenceCount(this, index);
}
Expand Down
20 changes: 20 additions & 0 deletions src/ort_genai_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ bool OGA_API_CALL OgaGenerator_IsDone(const OgaGenerator* generator) {
return reinterpret_cast<const Generators::Generator*>(generator)->IsDone();
}

bool OGA_API_CALL OgaGenerator_IsSessionTerminated(const OgaGenerator* generator) {
return reinterpret_cast<const Generators::Generator*>(generator)->IsSessionTerminated();
}

OgaResult* OGA_API_CALL OgaGenerator_ComputeLogits(OgaGenerator* generator) {
OGA_TRY
reinterpret_cast<Generators::Generator*>(generator)->ComputeLogits();
Expand All @@ -265,6 +269,22 @@ OgaResult* OGA_API_CALL OgaGenerator_GenerateNextToken(OgaGenerator* generator)
OGA_CATCH
}

OgaResult* OGA_API_CALL OgaGenerator_SetTerminate(OgaGenerator* oga_generator) {
OGA_TRY
auto& generator = *reinterpret_cast<const Generators::Generator*>(oga_generator);
generator.state_->SetTerminate();
return nullptr;
OGA_CATCH
}

OgaResult* OGA_API_CALL OgaGenerator_UnsetTerminate(OgaGenerator* oga_generator) {
OGA_TRY
auto& generator = *reinterpret_cast<const Generators::Generator*>(oga_generator);
generator.state_->UnsetTerminate();
return nullptr;
OGA_CATCH
}

OgaResult* OGA_API_CALL OgaGenerator_GetOutput(const OgaGenerator* oga_generator, const char* name, OgaTensor** out) {
OGA_TRY
auto& generator = *reinterpret_cast<const Generators::Generator*>(oga_generator);
Expand Down
3 changes: 3 additions & 0 deletions src/ort_genai_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ OGA_EXPORT void OGA_API_CALL OgaDestroyGenerator(OgaGenerator* generator);
* \return True if the generator has finished generating all the sequences, false otherwise.
*/
OGA_EXPORT bool OGA_API_CALL OgaGenerator_IsDone(const OgaGenerator* generator);
OGA_EXPORT bool OGA_API_CALL OgaGenerator_IsSessionTerminated(const OgaGenerator* generator);

/*
* \brief Computes the logits from the model based on the input ids and the past state. The computed logits are stored in the generator.
Expand All @@ -254,6 +255,8 @@ OGA_EXPORT bool OGA_API_CALL OgaGenerator_IsDone(const OgaGenerator* generator);
*/
OGA_EXPORT OgaResult* OGA_API_CALL OgaGenerator_ComputeLogits(OgaGenerator* generator);
OGA_EXPORT OgaResult* OGA_API_CALL OgaGenerator_GenerateNextToken(OgaGenerator* generator);
OGA_EXPORT OgaResult* OGA_API_CALL OgaGenerator_SetTerminate(OgaGenerator* generator);
OGA_EXPORT OgaResult* OGA_API_CALL OgaGenerator_UnsetTerminate(OgaGenerator* generator);

/*
* \brief Returns a copy of the model output identified by the given name as an OgaTensor on CPU. The buffer is owned by returned OgaTensor
Expand Down
63 changes: 63 additions & 0 deletions test/c_api_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <models/model.h>
#include <iostream>
#include <ort_genai.h>
#include <thread>
#include <vector>
#ifndef MODEL_PATH
#define MODEL_PATH "../../test/test_models/"
#endif
Expand Down Expand Up @@ -283,6 +285,67 @@ TEST(CAPITests, GetOutputCAPI) {

#if TEST_PHI2

void Generate_Output(OgaGenerator* generator, std::unique_ptr<OgaTokenizerStream> tokenizer_stream) {
try{
while (!generator->IsDone()) {
generator->ComputeLogits();
generator->GenerateNextToken();
}
}
catch (const std::exception& e) {
std::cout << "Session Terminated: " << e.what() << std::endl;
}
}
#endif

TEST(CAPITests, SetTerminate) {
#if TEST_PHI2

auto GeneratorSetTerminateCall = [](OgaGenerator* generator) {
generator->SetTerminate();
};

auto GenerateOutput = [](OgaGenerator* generator, std::unique_ptr<OgaTokenizerStream> tokenizer_stream) {
try {
while (!generator->IsDone()) {
generator->ComputeLogits();
generator->GenerateNextToken();
}
}
catch (const std::exception& e) {
std::cout << "Session Terminated: " << e.what() << std::endl;
}
};

auto model = OgaModel::Create(PHI2_PATH);
auto tokenizer = OgaTokenizer::Create(*model);
auto tokenizer_stream = OgaTokenizerStream::Create(*tokenizer);

const char* input_string = "She sells sea shells by the sea shore.";
auto input_sequences = OgaSequences::Create();
tokenizer->Encode(input_string, *input_sequences);
auto params = OgaGeneratorParams::Create(*model);
params->SetInputSequences(*input_sequences);
params->SetSearchOption("max_length", 40);

auto generator = OgaGenerator::Create(*model, *params);
EXPECT_EQ(generator->IsSessionTerminated(), false);
std::vector<std::thread> threads;
threads.push_back(std::thread(GenerateOutput, generator.get(), std::move(tokenizer_stream)));
threads.push_back(std::thread(GeneratorSetTerminateCall, generator.get()));

for (auto& th : threads) {
std::cout << "Waiting for threads completion" << std::endl;
th.join(); // Wait for each thread to finish
}
EXPECT_EQ(generator->IsSessionTerminated(), true);
generator->UnsetTerminate();
EXPECT_EQ(generator->IsSessionTerminated(), false);
#endif
}

#if TEST_PHI2

struct Phi2Test {
Phi2Test() {
model_ = OgaModel::Create(PHI2_PATH);
Expand Down
Loading