Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add PKExpireReset(),PKFieldScan()interface and rigorous test case #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/blackwidow/blackwidow.h
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,19 @@ class BlackWidow {
int64_t PKExpireScan(const DataType& dtype, int64_t cursor,
int32_t min_ttl, int32_t max_ttl,
int64_t count, std::vector<std::string>* keys);
// Iterate over a collection of elements, obtaining the item which timeout
// conforms to the inequality (min_ttl < item_ttl < max_ttl)
// and reset item_ttl to reset_ttl
Status PKExpireReset(const DataType& dtype, int32_t min_ttl, int32_t max_ttl, int32_t reset_ttl);

// Iterate over a collection of elements, obtaing the item which has
// specified field. The number of the key's field should between min_num and
// max_num exclusively
// return an updated cursor that the user need to use as cursor argument
// in the next call
int64_t PKFieldScan(const DataType& dtype, int64_t cursor,
int32_t min_num, int32_t max_num,
int64_t count, std::vector<std::string>* keys);

// Iterate over a collection of elements by specified range
// return a next_key that the user need to use as the key_start argument
Expand Down
205 changes: 195 additions & 10 deletions src/blackwidow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1084,8 +1084,16 @@ int64_t BlackWidow::PKExpireScan(const DataType& dtype, int64_t cursor,
start_key.erase(start_key.begin());
switch (key_type) {
case 'k':
is_finish = strings_db_->PKExpireScan(start_key, curtime + min_ttl,
curtime + max_ttl, keys, &leftover_visits, &next_key);
if (min_ttl == INT_MIN) {
is_finish = strings_db_->PKExpireScan(start_key, curtime - 1, curtime + max_ttl,
keys, &leftover_visits, &next_key);
} else if (max_ttl == INT_MAX) {
is_finish = strings_db_->PKExpireScan(start_key, curtime + min_ttl, max_ttl,
keys, &leftover_visits, &next_key);
} else {
is_finish = strings_db_->PKExpireScan(start_key, curtime + min_ttl, curtime + max_ttl,
keys, &leftover_visits, &next_key);
}
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("k") + next_key);
Expand All @@ -1102,8 +1110,16 @@ int64_t BlackWidow::PKExpireScan(const DataType& dtype, int64_t cursor,
}
start_key = "";
case 'h':
is_finish = hashes_db_->PKExpireScan(start_key, curtime + min_ttl,
curtime + max_ttl, keys, &leftover_visits, &next_key);
if (min_ttl == INT_MIN) {
is_finish = hashes_db_->PKExpireScan(start_key, curtime - 1, curtime + max_ttl,
keys, &leftover_visits, &next_key);
} else if (max_ttl == INT_MAX) {
is_finish = hashes_db_->PKExpireScan(start_key, curtime + min_ttl, max_ttl,
keys, &leftover_visits, &next_key);
} else {
is_finish = hashes_db_->PKExpireScan(start_key, curtime + min_ttl, curtime + max_ttl,
keys, &leftover_visits, &next_key);
}
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("h") + next_key);
Expand All @@ -1120,8 +1136,16 @@ int64_t BlackWidow::PKExpireScan(const DataType& dtype, int64_t cursor,
}
start_key = "";
case 's':
is_finish = sets_db_->PKExpireScan(start_key, curtime + min_ttl,
curtime + max_ttl, keys, &leftover_visits, &next_key);
if (min_ttl == INT_MIN) {
is_finish = sets_db_->PKExpireScan(start_key, curtime - 1, curtime + max_ttl,
keys, &leftover_visits, &next_key);
} else if (max_ttl == INT_MAX) {
is_finish = sets_db_->PKExpireScan(start_key, curtime + min_ttl, max_ttl,
keys, &leftover_visits, &next_key);
} else {
is_finish = sets_db_->PKExpireScan(start_key, curtime + min_ttl, curtime + max_ttl,
keys, &leftover_visits, &next_key);
}
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("s") + next_key);
Expand All @@ -1138,8 +1162,16 @@ int64_t BlackWidow::PKExpireScan(const DataType& dtype, int64_t cursor,
}
start_key = "";
case 'l':
is_finish = lists_db_->PKExpireScan(start_key, curtime + min_ttl,
curtime + max_ttl, keys, &leftover_visits, &next_key);
if (min_ttl == INT_MIN) {
is_finish = lists_db_->PKExpireScan(start_key, curtime - 1, curtime + max_ttl,
keys, &leftover_visits, &next_key);
} else if (max_ttl == INT_MAX) {
is_finish = lists_db_->PKExpireScan(start_key, curtime + min_ttl, max_ttl,
keys, &leftover_visits, &next_key);
} else {
is_finish = lists_db_->PKExpireScan(start_key, curtime + min_ttl, curtime + max_ttl,
keys, &leftover_visits, &next_key);
}
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("l") + next_key);
Expand All @@ -1156,8 +1188,16 @@ int64_t BlackWidow::PKExpireScan(const DataType& dtype, int64_t cursor,
}
start_key = "";
case 'z':
is_finish = zsets_db_->PKExpireScan(start_key, curtime + min_ttl,
curtime + max_ttl, keys, &leftover_visits, &next_key);
if (min_ttl == INT_MIN) {
is_finish = zsets_db_->PKExpireScan(start_key, curtime - 1, curtime + max_ttl,
keys, &leftover_visits, &next_key);
} else if (max_ttl == INT_MAX) {
is_finish = zsets_db_->PKExpireScan(start_key, curtime + min_ttl, max_ttl,
keys, &leftover_visits, &next_key);
} else {
is_finish = zsets_db_->PKExpireScan(start_key, curtime + min_ttl, curtime + max_ttl,
keys, &leftover_visits, &next_key);
}
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("z") + next_key);
Expand All @@ -1170,6 +1210,151 @@ int64_t BlackWidow::PKExpireScan(const DataType& dtype, int64_t cursor,
return cursor_ret;
}

Status BlackWidow::PKExpireReset(const DataType& data_type, int32_t min_ttl,
int32_t max_ttl, int32_t reset_ttl) {
int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
int min_timestamp, max_timestamp, reset_timestamp;
if (min_ttl == INT_MIN) {
min_timestamp = curtime;
} else {
min_timestamp = curtime + min_ttl;
}
if (max_ttl == INT_MAX) {
max_timestamp = max_ttl;
} else {
max_timestamp = curtime + max_ttl;
}
reset_timestamp = curtime + reset_ttl;
Status s;
switch (data_type) {
case kAll:
s = strings_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
if (!s.ok()) return s;
s = hashes_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
if (!s.ok()) return s;
s = lists_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
if (!s.ok()) return s;
s = sets_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
if (!s.ok()) return s;
s = zsets_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
if (!s.ok()) return s;
break;

case kStrings:
s = strings_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
break;

case kHashes:
s = hashes_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
break;

case kLists:
s = lists_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
break;

case kSets:
s = sets_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
break;

case kZSets:
s = zsets_db_->PKExpireReset(min_timestamp, max_timestamp, reset_timestamp);
break;
}
return s;
}

int64_t BlackWidow::PKFieldScan(const DataType& dtype, int64_t cursor,
int32_t min_num, int32_t max_num,
int64_t count, std::vector<std::string>* keys) {
keys->clear();
bool is_finish;
int64_t leftover_visits = count;
int64_t step_length = count, cursor_ret = 0;
std::string start_key, next_key;
if (cursor < 0) {
return cursor_ret;
} else {
Status s = GetStartKey(dtype, cursor, &start_key);
if (s.IsNotFound()) {
//If want to scan all the databases, we start with the Hash database
start_key = std::string(1, dtype == DataType::kAll
? DataTypeTag[kHashes] : DataTypeTag[dtype]);
cursor = 0;
}
}

char key_type = start_key.at(0);
start_key.erase(start_key.begin());
switch (key_type) {
case 'h':
is_finish = hashes_db_->PKFieldScan(start_key, min_num, max_num,
&leftover_visits, &next_key, keys);
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("h") + next_key);
break;
} else if (is_finish) {
if (DataType::kHashes == dtype) {
cursor_ret = 0;
break;
} else if (!leftover_visits) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("s"));
break;
}
}
start_key = "";
case 's':
is_finish = sets_db_->PKFieldScan(start_key, min_num, max_num,
&leftover_visits, &next_key, keys);
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("s") + next_key);
break;
} else if (is_finish) {
if (DataType::kSets == dtype) {
cursor_ret = 0;
break;
} else if (!leftover_visits) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("l"));
break;
}
}
start_key = "";
case 'l':
is_finish = lists_db_->PKFieldScan(start_key, min_num, max_num,
&leftover_visits, &next_key, keys);
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("l") + next_key);
break;
} else if (is_finish) {
if (DataType::kLists == dtype) {
cursor_ret = 0;
break;
} else if (!leftover_visits) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("z"));
break;
}
}
start_key = "";
case 'z':
is_finish = zsets_db_->PKFieldScan(start_key, min_num, max_num,
&leftover_visits, &next_key, keys);
if (!leftover_visits && !is_finish) {
cursor_ret = cursor + step_length;
StoreCursorStartKey(dtype, cursor_ret, std::string("z") + next_key);
break;
} else if (is_finish) {
cursor_ret = 0;
break;
}
}
return cursor_ret;
}


Status BlackWidow::PKScanRange(const DataType& data_type,
Expand Down
3 changes: 3 additions & 0 deletions src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class Redis {
std::vector<std::string>* keys,
int64_t* leftover_visits,
std::string* next_key) = 0;
virtual Status PKExpireReset(const int32_t min_timestamp,
const int32_t max_timestamp,
const int32_t reset_timestamp) = 0;
virtual Status Expireat(const Slice& key,
int32_t timestamp) = 0;
virtual Status Persist(const Slice& key) = 0;
Expand Down
86 changes: 86 additions & 0 deletions src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "src/base_filter.h"
#include "src/scope_record_lock.h"
#include "src/scope_snapshot.h"
#include "slash/include/slash_string.h"

namespace blackwidow {

Expand Down Expand Up @@ -1316,6 +1317,10 @@ bool RedisHashes::PKExpireScan(const std::string& start_key,
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

int64_t curtime;
rocksdb::Env::Default()->GetCurrentTime(&curtime);
char temp_ttl[20];

rocksdb::Iterator* it = db_->NewIterator(iterator_options, handles_[0]);
it->Seek(start_key);
while (it->Valid() && (*leftover_visits) > 0) {
Expand All @@ -1328,6 +1333,12 @@ bool RedisHashes::PKExpireScan(const std::string& start_key,
if (min_timestamp < parsed_hashes_meta_value.timestamp()
&& parsed_hashes_meta_value.timestamp() < max_timestamp) {
keys->push_back(it->key().ToString());
int len = slash::ll2string(temp_ttl, sizeof temp_ttl, parsed_hashes_meta_value.timestamp() - curtime);
if (!len) {
keys->push_back("0");
} else {
keys->push_back(std::string(temp_ttl, len));
}
}
(*leftover_visits)--;
it->Next();
Expand All @@ -1344,6 +1355,81 @@ bool RedisHashes::PKExpireScan(const std::string& start_key,
return is_finish;
}

Status RedisHashes::PKExpireReset(const int32_t min_timestamp,
const int32_t max_timestamp,
const int32_t reset_timestamp) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

rocksdb::WriteBatch batch;
rocksdb::Iterator* it = db_->NewIterator(iterator_options, handles_[0]);

std::string meta_value;
it->SeekToFirst();
while (it->Valid()) {
meta_value = it->value().ToString();
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
if (parsed_hashes_meta_value.IsStale()
|| parsed_hashes_meta_value.count() == 0) {
it->Next();
continue;
} else {
if (min_timestamp < parsed_hashes_meta_value.timestamp()
&& parsed_hashes_meta_value.timestamp() < max_timestamp) {
parsed_hashes_meta_value.set_timestamp(reset_timestamp);
batch.Put(handles_[0], it->key(), meta_value);
}
it->Next();
}
}
return db_->Write(default_write_options_, &batch);
}

bool RedisHashes::PKFieldScan(const std::string& start_key,
int32_t min_num, int32_t max_num,
int64_t *leftover_visits, std::string* next_key,
std::vector<std::string>* keys) {
bool is_finish = true;
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
iterator_options.snapshot = snapshot;
iterator_options.fill_cache = false;

rocksdb::Iterator* it = db_->NewIterator(iterator_options, handles_[0]);
it->Seek(start_key);
char temp_count[20];

while (it->Valid() && (*leftover_visits) > 0) {
ParsedHashesMetaValue parsed_hashes_meta_value(it->value());
if (!parsed_hashes_meta_value.IsStale()
&& parsed_hashes_meta_value.count() > min_num
&& parsed_hashes_meta_value.count() < max_num) {
keys->push_back(it->key().ToString());
int len = slash::ll2string(temp_count, sizeof temp_count,
parsed_hashes_meta_value.count());
if (!len) {
keys->push_back("0");
} else {
keys->push_back(std::string(temp_count, len));
}
(*leftover_visits)--;
}
it->Next();
}
if (it->Valid()) {
is_finish = false;
*next_key = it->key().ToString();
} else {
*next_key = "";
}
delete it;
return is_finish;
}

Status RedisHashes::Expireat(const Slice& key, int32_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Expand Down
Loading