Skip to content

Commit

Permalink
get rid of thread locking: just do simple round robin
Browse files Browse the repository at this point in the history
  • Loading branch information
hoytech committed Aug 29, 2024
1 parent 5f6c080 commit ebaa1e6
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 53 deletions.
38 changes: 3 additions & 35 deletions src/apps/web/WebHttpsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
uWS::Group<uWS::SERVER> *hubGroup;
flat_hash_map<uint64_t, Connection*> connIdToConnection;
uint64_t nextConnectionId = 1;
uint64_t requestCounter = 0;

flat_hash_map<uWS::HttpResponse *, HTTPRequest> receivingRequests;

std::vector<bool> tpReaderLock(tpReader.numThreads, false);
std::queue<MsgWebReader> pendingReaderMessages;


{
int extensionOptions = 0;
Expand Down Expand Up @@ -49,20 +47,8 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
c->pendingRequests.insert(res);

if (req.method == uWS::HttpMethod::METHOD_GET) {
auto m = MsgWebReader{MsgWebReader::Request{std::move(req), MAX_U64}};
bool didDispatch = false;

for (uint64_t i = 0; i < tpReader.numThreads; i++) {
if (tpReaderLock[i] == false) {
tpReaderLock[i] = true;
std::get<MsgWebReader::Request>(m.msg).lockedThreadId = i;
tpReader.dispatch(i, std::move(m));
didDispatch = true;
break;
}
}

if (!didDispatch) pendingReaderMessages.emplace(std::move(m));
auto m = MsgWebReader{MsgWebReader::Request{std::move(req)}};
tpReader.dispatch(requestCounter++, std::move(m));
} else if (req.method == uWS::HttpMethod::METHOD_POST) {
if (remainingBytes) {
receivingRequests.emplace(res, std::move(req));
Expand All @@ -87,20 +73,6 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
});


auto unlockThread = [&](uint64_t lockedThreadId){
if (lockedThreadId == MAX_U64) return;

if (tpReaderLock.at(lockedThreadId) == false) throw herr("tried to unlock already unlocked reader lock!");

if (pendingReaderMessages.empty()) {
tpReaderLock[lockedThreadId] = false;
} else {
std::get<MsgWebReader::Request>(pendingReaderMessages.front().msg).lockedThreadId = lockedThreadId;
tpReader.dispatch(lockedThreadId, std::move(pendingReaderMessages.front()));
pendingReaderMessages.pop();
}
};

std::function<void()> asyncCb = [&]{
auto newMsgs = thr.inbox.pop_all_no_wait();

Expand All @@ -118,10 +90,6 @@ void WebServer::runHttpsocket(ThreadPool<MsgHttpsocket>::Thread &thr) {
c.pendingRequests.erase(msg->res);

msg->res->end(msg->payload.data(), msg->payload.size());

unlockThread(msg->lockedThreadId);
} else if (auto msg = std::get_if<MsgHttpsocket::Unlock>(&newMsg.msg)) {
unlockThread(msg->lockedThreadId);
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/apps/web/WebReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,15 @@ void WebServer::runReader(ThreadPool<MsgWebReader>::Thread &thr) {
try {
HTTPResponse resp = generateReadResponse(txn, decomp, msg->req);
std::string payload = resp.encode(msg->req.acceptGzip);
sendHttpResponseAndUnlock(msg->lockedThreadId, msg->req, payload);
sendHttpResponseRaw(msg->req, payload);
} catch (std::exception &e) {
HTTPResponse res;
res.code = "500 Server Error";
res.body = "Server error";

std::string payload = res.encode(false);

sendHttpResponseAndUnlock(msg->lockedThreadId, msg->req, payload);
sendHttpResponseRaw(msg->req, payload);
LE << "500 server error: " << e.what();
}
}
Expand Down
21 changes: 5 additions & 16 deletions src/apps/web/WebServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,16 @@ struct MsgHttpsocket : NonCopyable {
uint64_t connId;
uWS::HttpResponse *res;
std::string payload;
uint64_t lockedThreadId;
};

struct Unlock {
uint64_t lockedThreadId;
};

using Var = std::variant<Send, Unlock>;
using Var = std::variant<Send>;
Var msg;
MsgHttpsocket(Var &&msg_) : msg(std::move(msg_)) {}
};

struct MsgWebReader : NonCopyable {
struct Request {
HTTPRequest req;
uint64_t lockedThreadId;
};

using Var = std::variant<Request>;
Expand Down Expand Up @@ -96,25 +90,20 @@ struct WebServer {

// Utils

void unlockThread(uint64_t lockedThreadId) {
tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Unlock{lockedThreadId}});
hubTrigger->send();
}

// Moves from payload!
void sendHttpResponseAndUnlock(uint64_t lockedThreadId, const HTTPRequest &req, std::string &payload) {
tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Send{req.connId, req.res, std::move(payload), lockedThreadId}});
void sendHttpResponseRaw(const HTTPRequest &req, std::string &payload) {
tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Send{req.connId, req.res, std::move(payload)}});
hubTrigger->send();
}

void sendHttpResponse(const HTTPRequest &req, std::string_view body, std::string_view code = "200 OK", std::string_view contentType = "text/html; charset=utf-8") {
HTTPResponse res;
res.code = code;
res.contentType = contentType;
res.body = std::string(body); // FIXME: copy
res.body = std::string(body); // FIXME: don't copy

std::string payload = res.encode(false);

sendHttpResponseAndUnlock(MAX_U64, req, payload);
sendHttpResponseRaw(req, payload);
}
};

0 comments on commit ebaa1e6

Please sign in to comment.