Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug to dedicated flusher #174

Merged
merged 3 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/db_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,28 @@ void DBMgr::initInternal(const GlobalConfig& config) {
for (size_t ii=0; ii<config.numFlusherThreads; ++ii) {
// If dedicated flusher is enabled, only the first
// `numDedicatedFlusherForAsyncReqs` flushers will handle async reqs.
bool handle_async_reqs =
!(dedicated_async_flusher && ii >= config.numDedicatedFlusherForAsyncReqs);
std::string t_name =
handle_async_reqs
? "flusher_ded_" + std::to_string(ii)
: "flusher_" + std::to_string(ii);
Flusher* flusher = new Flusher(t_name, config);
flusher->handleAsyncReqs = handle_async_reqs;
Flusher::FlusherType flusher_type = Flusher::FlusherType::GENERIC;
if (dedicated_async_flusher) {
if (ii < config.numDedicatedFlusherForAsyncReqs) {
flusher_type = Flusher::FlusherType::FLUSH_ON_DEMAND;
} else {
flusher_type = Flusher::FlusherType::FLUSH_ON_CONDITION;
}
}
std::string t_name;
switch (flusher_type) {
case Flusher::FlusherType::FLUSH_ON_DEMAND:
t_name = "flusher_od_" + std::to_string(ii);
break;
case Flusher::FlusherType::FLUSH_ON_CONDITION:
t_name = "flusher_oc_" + std::to_string(ii);
break;
default:
t_name = "flusher_gen_" + std::to_string(ii);
break;
}

Flusher* flusher = new Flusher(t_name, config, flusher_type);
wMgr->addWorker(flusher);
flusher->run();
}
Expand Down
20 changes: 10 additions & 10 deletions src/flusher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ size_t FlusherQueue::size() const {
return queue.size();
}


Flusher::Flusher(const std::string& _w_name,
const GlobalConfig& _config)
Flusher::Flusher(const std::string& w_name,
const GlobalConfig& g_config,
FlusherType f_type)
: lastCheckedFileIndex(0xffff) // Any big number to start from 0.
{
workerName = _w_name;
gConfig = _config;
handleAsyncReqs = true;
, type(f_type) {
workerName = w_name;
gConfig = g_config;
FlusherOptions options;
options.sleepDuration_ms = gConfig.flusherSleepDuration_ms;
options.worker = this;
Expand All @@ -107,7 +106,7 @@ void Flusher::work(WorkerOptions* opt_base) {
DB* target_db = nullptr;

FlusherQueueElem* elem = nullptr;
if (handleAsyncReqs) {
if (type != FlusherType::FLUSH_ON_CONDITION) {
elem = dbm->flusherQueue()->pop();
}

Expand All @@ -127,7 +126,7 @@ void Flusher::work(WorkerOptions* opt_base) {
}
if (cursor) skiplist_release_node(cursor);

} else if (!handleAsyncReqs) {
} else if (type != FlusherType::FLUSH_ON_DEMAND) {
// Otherwise: check DB map, only when it is not the dedicated flusher.
std::lock_guard<std::mutex> l(dbm->dbMapLock);

Expand Down Expand Up @@ -242,7 +241,8 @@ void Flusher::work(WorkerOptions* opt_base) {
target_db->p->decBgTask();
}

if ( dbm->flusherQueue()->size() &&
if ( type != FlusherType::FLUSH_ON_CONDITION &&
dbm->flusherQueue()->size() &&
!delayed_task ) {
doNotSleepNextTime = true;
}
Expand Down
9 changes: 6 additions & 3 deletions src/flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,20 @@ class FlusherQueue {

class Flusher : public WorkerBase {
public:
enum FlusherType { GENERIC = 0x0, FLUSH_ON_DEMAND = 0x1, FLUSH_ON_CONDITION = 0x2 };

struct FlusherOptions : public WorkerOptions {
};

Flusher(const std::string& _w_name,
const GlobalConfig& _config);
Flusher(const std::string& w_name,
const GlobalConfig& g_config,
FlusherType f_type = FlusherType::GENERIC);
~Flusher();
void work(WorkerOptions* opt_base);

GlobalConfig gConfig;
size_t lastCheckedFileIndex;
bool handleAsyncReqs;
FlusherType type;
};


Expand Down
8 changes: 5 additions & 3 deletions src/jungle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,12 @@ Status DB::flushLogsAsync(const FlushOptions& options,
db_mgr->flusherQueue()->push(elem);

// If dedicated workers are enabled, invoke those workers only.
const std::string WORKER_PREFIX =
bool dedicated_async_flusher =
db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs
? "flusher_ded"
: "flusher";
&& db_mgr->getGlobalConfig()->numFlusherThreads
> db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs;
const std::string WORKER_PREFIX =
dedicated_async_flusher ? "flusher_od" : "flusher_gen_";

if (options.execDelayUs) {
// Delay is given.
Expand Down
Loading