diff --git a/src/apps/web/WebHttpsocket.cpp b/src/apps/web/WebHttpsocket.cpp index 1cb4bdd0..8ef9cba5 100644 --- a/src/apps/web/WebHttpsocket.cpp +++ b/src/apps/web/WebHttpsocket.cpp @@ -11,12 +11,10 @@ void WebServer::runHttpsocket(ThreadPool::Thread &thr) { uWS::Group *hubGroup; flat_hash_map connIdToConnection; uint64_t nextConnectionId = 1; + uint64_t requestCounter = 0; flat_hash_map receivingRequests; - std::vector tpReaderLock(tpReader.numThreads, false); - std::queue pendingReaderMessages; - { int extensionOptions = 0; @@ -49,20 +47,8 @@ void WebServer::runHttpsocket(ThreadPool::Thread &thr) { c->pendingRequests.insert(res); if (req.method == uWS::HttpMethod::METHOD_GET) { - auto m = MsgWebReader{MsgWebReader::Request{std::move(req), MAX_U64}}; - bool didDispatch = false; - - for (uint64_t i = 0; i < tpReader.numThreads; i++) { - if (tpReaderLock[i] == false) { - tpReaderLock[i] = true; - std::get(m.msg).lockedThreadId = i; - tpReader.dispatch(i, std::move(m)); - didDispatch = true; - break; - } - } - - if (!didDispatch) pendingReaderMessages.emplace(std::move(m)); + auto m = MsgWebReader{MsgWebReader::Request{std::move(req)}}; + tpReader.dispatch(requestCounter++, std::move(m)); } else if (req.method == uWS::HttpMethod::METHOD_POST) { if (remainingBytes) { receivingRequests.emplace(res, std::move(req)); @@ -87,20 +73,6 @@ void WebServer::runHttpsocket(ThreadPool::Thread &thr) { }); - auto unlockThread = [&](uint64_t lockedThreadId){ - if (lockedThreadId == MAX_U64) return; - - if (tpReaderLock.at(lockedThreadId) == false) throw herr("tried to unlock already unlocked reader lock!"); - - if (pendingReaderMessages.empty()) { - tpReaderLock[lockedThreadId] = false; - } else { - std::get(pendingReaderMessages.front().msg).lockedThreadId = lockedThreadId; - tpReader.dispatch(lockedThreadId, std::move(pendingReaderMessages.front())); - pendingReaderMessages.pop(); - } - }; - std::function asyncCb = [&]{ auto newMsgs = thr.inbox.pop_all_no_wait(); @@ -118,10 +90,6 @@ void WebServer::runHttpsocket(ThreadPool::Thread &thr) { c.pendingRequests.erase(msg->res); msg->res->end(msg->payload.data(), msg->payload.size()); - - unlockThread(msg->lockedThreadId); - } else if (auto msg = std::get_if(&newMsg.msg)) { - unlockThread(msg->lockedThreadId); } } }; diff --git a/src/apps/web/WebReader.cpp b/src/apps/web/WebReader.cpp index 525d8b48..192386ed 100644 --- a/src/apps/web/WebReader.cpp +++ b/src/apps/web/WebReader.cpp @@ -367,7 +367,7 @@ void WebServer::runReader(ThreadPool::Thread &thr) { try { HTTPResponse resp = generateReadResponse(txn, decomp, msg->req); std::string payload = resp.encode(msg->req.acceptGzip); - sendHttpResponseAndUnlock(msg->lockedThreadId, msg->req, payload); + sendHttpResponseRaw(msg->req, payload); } catch (std::exception &e) { HTTPResponse res; res.code = "500 Server Error"; @@ -375,7 +375,7 @@ void WebServer::runReader(ThreadPool::Thread &thr) { std::string payload = res.encode(false); - sendHttpResponseAndUnlock(msg->lockedThreadId, msg->req, payload); + sendHttpResponseRaw(msg->req, payload); LE << "500 server error: " << e.what(); } } diff --git a/src/apps/web/WebServer.h b/src/apps/web/WebServer.h index 04485fff..ef9eac27 100644 --- a/src/apps/web/WebServer.h +++ b/src/apps/web/WebServer.h @@ -39,14 +39,9 @@ struct MsgHttpsocket : NonCopyable { uint64_t connId; uWS::HttpResponse *res; std::string payload; - uint64_t lockedThreadId; }; - struct Unlock { - uint64_t lockedThreadId; - }; - - using Var = std::variant; + using Var = std::variant; Var msg; MsgHttpsocket(Var &&msg_) : msg(std::move(msg_)) {} }; @@ -54,7 +49,6 @@ struct MsgHttpsocket : NonCopyable { struct MsgWebReader : NonCopyable { struct Request { HTTPRequest req; - uint64_t lockedThreadId; }; using Var = std::variant; @@ -96,14 +90,9 @@ struct WebServer { // Utils - void unlockThread(uint64_t lockedThreadId) { - tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Unlock{lockedThreadId}}); - hubTrigger->send(); - } - // Moves from payload! - void sendHttpResponseAndUnlock(uint64_t lockedThreadId, const HTTPRequest &req, std::string &payload) { - tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Send{req.connId, req.res, std::move(payload), lockedThreadId}}); + void sendHttpResponseRaw(const HTTPRequest &req, std::string &payload) { + tpHttpsocket.dispatch(0, MsgHttpsocket{MsgHttpsocket::Send{req.connId, req.res, std::move(payload)}}); hubTrigger->send(); } @@ -111,10 +100,10 @@ struct WebServer { HTTPResponse res; res.code = code; res.contentType = contentType; - res.body = std::string(body); // FIXME: copy + res.body = std::string(body); // FIXME: don't copy std::string payload = res.encode(false); - sendHttpResponseAndUnlock(MAX_U64, req, payload); + sendHttpResponseRaw(req, payload); } };