diff --git a/src/db_mgr.cc b/src/db_mgr.cc index 68184f0..ded1f96 100644 --- a/src/db_mgr.cc +++ b/src/db_mgr.cc @@ -95,14 +95,28 @@ void DBMgr::initInternal(const GlobalConfig& config) { for (size_t ii=0; ii= config.numDedicatedFlusherForAsyncReqs); - std::string t_name = - handle_async_reqs - ? "flusher_ded_" + std::to_string(ii) - : "flusher_" + std::to_string(ii); - Flusher* flusher = new Flusher(t_name, config); - flusher->handleAsyncReqs = handle_async_reqs; + Flusher::FlusherType flusher_type = Flusher::FlusherType::GENERIC; + if (dedicated_async_flusher) { + if (ii < config.numDedicatedFlusherForAsyncReqs) { + flusher_type = Flusher::FlusherType::FLUSH_ON_DEMAND; + } else { + flusher_type = Flusher::FlusherType::FLUSH_ON_CONDITION; + } + } + std::string t_name; + switch (flusher_type) { + case Flusher::FlusherType::FLUSH_ON_DEMAND: + t_name = "flusher_od_" + std::to_string(ii); + break; + case Flusher::FlusherType::FLUSH_ON_CONDITION: + t_name = "flusher_oc_" + std::to_string(ii); + break; + default: + t_name = "flusher_gen_" + std::to_string(ii); + break; + } + + Flusher* flusher = new Flusher(t_name, config, flusher_type); wMgr->addWorker(flusher); flusher->run(); } diff --git a/src/flusher.cc b/src/flusher.cc index 2afbb55..81ffc0d 100644 --- a/src/flusher.cc +++ b/src/flusher.cc @@ -80,14 +80,13 @@ size_t FlusherQueue::size() const { return queue.size(); } - -Flusher::Flusher(const std::string& _w_name, - const GlobalConfig& _config) +Flusher::Flusher(const std::string& w_name, + const GlobalConfig& g_config, + FlusherType f_type) : lastCheckedFileIndex(0xffff) // Any big number to start from 0. -{ - workerName = _w_name; - gConfig = _config; - handleAsyncReqs = true; + , type(f_type) { + workerName = w_name; + gConfig = g_config; FlusherOptions options; options.sleepDuration_ms = gConfig.flusherSleepDuration_ms; options.worker = this; @@ -107,7 +106,7 @@ void Flusher::work(WorkerOptions* opt_base) { DB* target_db = nullptr; FlusherQueueElem* elem = nullptr; - if (handleAsyncReqs) { + if (type != FlusherType::FLUSH_ON_CONDITION) { elem = dbm->flusherQueue()->pop(); } @@ -127,7 +126,7 @@ void Flusher::work(WorkerOptions* opt_base) { } if (cursor) skiplist_release_node(cursor); - } else if (!handleAsyncReqs) { + } else if (type != FlusherType::FLUSH_ON_DEMAND) { // Otherwise: check DB map, only when it is not the dedicated flusher. std::lock_guard l(dbm->dbMapLock); @@ -242,7 +241,8 @@ void Flusher::work(WorkerOptions* opt_base) { target_db->p->decBgTask(); } - if ( dbm->flusherQueue()->size() && + if ( type != FlusherType::FLUSH_ON_CONDITION && + dbm->flusherQueue()->size() && !delayed_task ) { doNotSleepNextTime = true; } diff --git a/src/flusher.h b/src/flusher.h index e710b60..e269641 100644 --- a/src/flusher.h +++ b/src/flusher.h @@ -73,17 +73,20 @@ class FlusherQueue { class Flusher : public WorkerBase { public: + enum FlusherType { GENERIC = 0x0, FLUSH_ON_DEMAND = 0x1, FLUSH_ON_CONDITION = 0x2 }; + struct FlusherOptions : public WorkerOptions { }; - Flusher(const std::string& _w_name, - const GlobalConfig& _config); + Flusher(const std::string& w_name, + const GlobalConfig& g_config, + FlusherType f_type = FlusherType::GENERIC); ~Flusher(); void work(WorkerOptions* opt_base); GlobalConfig gConfig; size_t lastCheckedFileIndex; - bool handleAsyncReqs; + FlusherType type; }; diff --git a/src/jungle.cc b/src/jungle.cc index 749dcba..3c06b7d 100644 --- a/src/jungle.cc +++ b/src/jungle.cc @@ -502,10 +502,12 @@ Status DB::flushLogsAsync(const FlushOptions& options, db_mgr->flusherQueue()->push(elem); // If dedicated workers are enabled, invoke those workers only. - const std::string WORKER_PREFIX = + bool dedicated_async_flusher = db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs - ? "flusher_ded" - : "flusher"; + && db_mgr->getGlobalConfig()->numFlusherThreads + > db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs; + const std::string WORKER_PREFIX = + dedicated_async_flusher ? "flusher_od" : "flusher_gen_"; if (options.execDelayUs) { // Delay is given.