Skip to content

Commit

Permalink
Fix bug to dedicated flusher (#174)
Browse files Browse the repository at this point in the history
* [Update PR] thread name

* Fix bug to dedicated flusher

* [Update PR] Renaming

---------

Co-authored-by: Jung-Sang Ahn <[email protected]>
Co-authored-by: Zexi Liu <[email protected]>
  • Loading branch information
3 people authored Sep 10, 2024
1 parent 8c68b37 commit fb1f0a7
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 24 deletions.
30 changes: 22 additions & 8 deletions src/db_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,28 @@ void DBMgr::initInternal(const GlobalConfig& config) {
for (size_t ii=0; ii<config.numFlusherThreads; ++ii) {
// If dedicated flusher is enabled, only the first
// `numDedicatedFlusherForAsyncReqs` flushers will handle async reqs.
bool handle_async_reqs =
!(dedicated_async_flusher && ii >= config.numDedicatedFlusherForAsyncReqs);
std::string t_name =
handle_async_reqs
? "flusher_ded_" + std::to_string(ii)
: "flusher_" + std::to_string(ii);
Flusher* flusher = new Flusher(t_name, config);
flusher->handleAsyncReqs = handle_async_reqs;
Flusher::FlusherType flusher_type = Flusher::FlusherType::GENERIC;
if (dedicated_async_flusher) {
if (ii < config.numDedicatedFlusherForAsyncReqs) {
flusher_type = Flusher::FlusherType::FLUSH_ON_DEMAND;
} else {
flusher_type = Flusher::FlusherType::FLUSH_ON_CONDITION;
}
}
std::string t_name;
switch (flusher_type) {
case Flusher::FlusherType::FLUSH_ON_DEMAND:
t_name = "flusher_od_" + std::to_string(ii);
break;
case Flusher::FlusherType::FLUSH_ON_CONDITION:
t_name = "flusher_oc_" + std::to_string(ii);
break;
default:
t_name = "flusher_gen_" + std::to_string(ii);
break;
}

Flusher* flusher = new Flusher(t_name, config, flusher_type);
wMgr->addWorker(flusher);
flusher->run();
}
Expand Down
20 changes: 10 additions & 10 deletions src/flusher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ size_t FlusherQueue::size() const {
return queue.size();
}


Flusher::Flusher(const std::string& _w_name,
const GlobalConfig& _config)
Flusher::Flusher(const std::string& w_name,
const GlobalConfig& g_config,
FlusherType f_type)
: lastCheckedFileIndex(0xffff) // Any big number to start from 0.
{
workerName = _w_name;
gConfig = _config;
handleAsyncReqs = true;
, type(f_type) {
workerName = w_name;
gConfig = g_config;
FlusherOptions options;
options.sleepDuration_ms = gConfig.flusherSleepDuration_ms;
options.worker = this;
Expand All @@ -107,7 +106,7 @@ void Flusher::work(WorkerOptions* opt_base) {
DB* target_db = nullptr;

FlusherQueueElem* elem = nullptr;
if (handleAsyncReqs) {
if (type != FlusherType::FLUSH_ON_CONDITION) {
elem = dbm->flusherQueue()->pop();
}

Expand All @@ -127,7 +126,7 @@ void Flusher::work(WorkerOptions* opt_base) {
}
if (cursor) skiplist_release_node(cursor);

} else if (!handleAsyncReqs) {
} else if (type != FlusherType::FLUSH_ON_DEMAND) {
// Otherwise: check DB map, only when it is not the dedicated flusher.
std::lock_guard<std::mutex> l(dbm->dbMapLock);

Expand Down Expand Up @@ -242,7 +241,8 @@ void Flusher::work(WorkerOptions* opt_base) {
target_db->p->decBgTask();
}

if ( dbm->flusherQueue()->size() &&
if ( type != FlusherType::FLUSH_ON_CONDITION &&
dbm->flusherQueue()->size() &&
!delayed_task ) {
doNotSleepNextTime = true;
}
Expand Down
9 changes: 6 additions & 3 deletions src/flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,20 @@ class FlusherQueue {

class Flusher : public WorkerBase {
public:
enum FlusherType { GENERIC = 0x0, FLUSH_ON_DEMAND = 0x1, FLUSH_ON_CONDITION = 0x2 };

struct FlusherOptions : public WorkerOptions {
};

Flusher(const std::string& _w_name,
const GlobalConfig& _config);
Flusher(const std::string& w_name,
const GlobalConfig& g_config,
FlusherType f_type = FlusherType::GENERIC);
~Flusher();
void work(WorkerOptions* opt_base);

GlobalConfig gConfig;
size_t lastCheckedFileIndex;
bool handleAsyncReqs;
FlusherType type;
};


Expand Down
8 changes: 5 additions & 3 deletions src/jungle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,12 @@ Status DB::flushLogsAsync(const FlushOptions& options,
db_mgr->flusherQueue()->push(elem);

// If dedicated workers are enabled, invoke those workers only.
const std::string WORKER_PREFIX =
bool dedicated_async_flusher =
db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs
? "flusher_ded"
: "flusher";
&& db_mgr->getGlobalConfig()->numFlusherThreads
> db_mgr->getGlobalConfig()->numDedicatedFlusherForAsyncReqs;
const std::string WORKER_PREFIX =
dedicated_async_flusher ? "flusher_od" : "flusher_gen_";

if (options.execDelayUs) {
// Delay is given.
Expand Down

0 comments on commit fb1f0a7

Please sign in to comment.