Skip to content

Commit

Permalink
feat: use binlog to delete keys in HDel
Browse files Browse the repository at this point in the history
  • Loading branch information
longfar-ncy committed Jan 16, 2024
1 parent 750cbe7 commit 752ee1f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
12 changes: 9 additions & 3 deletions src/storage/include/storage/binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ class Binlog {
public:
explicit Binlog(DataType type) : data_type_(type) {}

void AppendOperation(int8_t cfid, OperateType type, const Slice& key,
const std::optional<Slice>& value = std::nullopt) {
entries_.emplace_back(cfid, type, key, value);
// void AppendOperation(int8_t cfid, OperateType type, const Slice& key,
// const std::optional<Slice>& value = std::nullopt) {
// entries_.emplace_back(cfid, type, key, value);
// }
void AppendPutOperation(int8_t cfid, const Slice& key, const Slice& value) {
entries_.emplace_back(cfid, OperateType::kPut, key, value);
}
void AppendDeleteOperation(int8_t cfid, const Slice& key) {
entries_.emplace_back(cfid, OperateType::kDelete, key, std::nullopt);
}

auto Serialization() -> std::string;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class Redis {
auto CreateBinlog() -> Binlog { return Binlog{GetDataType()}; }
auto CreatePutWithoutMetaBinlog(const Slice& key, Slice&& value) -> Binlog {
auto log = CreateBinlog();
log.AppendOperation(-1, OperateType::kPut, key, std::make_optional<Slice>(std::move(value)));
log.AppendPutOperation(-1, key, value);
return log;
}
};
Expand Down
25 changes: 15 additions & 10 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ Status RedisHashes::HDel(const Slice& key, const std::vector<std::string>& field
ScopeRecordLock l(lock_mgr_, key);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
Binlog binlog = CreateBinlog();
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
Expand All @@ -226,7 +227,8 @@ Status RedisHashes::HDel(const Slice& key, const std::vector<std::string>& field
if (s.ok()) {
del_cnt++;
statistic++;
batch.Delete(handles_[1], hashes_data_key.Encode());
// batch.Delete(handles_[1], hashes_data_key.Encode());
binlog.AppendDeleteOperation(1, hashes_data_key.Encode());
} else if (s.IsNotFound()) {
continue;
} else {
Expand All @@ -238,15 +240,18 @@ Status RedisHashes::HDel(const Slice& key, const std::vector<std::string>& field
return Status::InvalidArgument("hash size overflow");
}
parsed_hashes_meta_value.ModifyCount(-del_cnt);
batch.Put(handles_[0], key, meta_value);
// batch.Put(handles_[0], key, meta_value);
binlog.AppendPutOperation(0, key, meta_value);
}
} else if (s.IsNotFound()) {
*ret = 0;
return Status::OK();
} else {
return s;
}
s = db_->Write(default_write_options_, &batch);
// s = db_->Write(default_write_options_, &batch);
auto future = storage_->GetLogQueue()->Produce(std::move(binlog));
s = future.get();
UpdateSpecificKeyStatistics(key.ToString(), statistic);
return s;
}
Expand Down Expand Up @@ -673,10 +678,10 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.set_count(1);
// batch.Put(handles_[0], key, meta_value);
binlog.AppendOperation(0, OperateType::kPut, key, std::move(meta_value));
binlog.AppendPutOperation(0, key, std::move(meta_value));
HashesDataKey data_key(key, version, field);
// batch.Put(handles_[1], data_key.Encode(), value);
binlog.AppendOperation(1, OperateType::kPut, data_key.Encode(), value);
binlog.AppendPutOperation(1, data_key.Encode(), value);
*res = 1;
} else {
version = parsed_hashes_meta_value.version();
Expand All @@ -689,7 +694,7 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu
return Status::OK();
} else {
// batch.Put(handles_[1], hashes_data_key.Encode(), value);
binlog.AppendOperation(1, OperateType::kPut, hashes_data_key.Encode(), value);
binlog.AppendPutOperation(1, hashes_data_key.Encode(), value);
statistic++;
}
} else if (s.IsNotFound()) {
Expand All @@ -699,8 +704,8 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu
parsed_hashes_meta_value.ModifyCount(1);
// batch.Put(handles_[0], key, meta_value);
// batch.Put(handles_[1], hashes_data_key.Encode(), value);
binlog.AppendOperation(0, OperateType::kPut, key, std::move(meta_value));
binlog.AppendOperation(1, OperateType::kPut, hashes_data_key.Encode(), value);
binlog.AppendPutOperation(0, key, std::move(meta_value));
binlog.AppendPutOperation(1, hashes_data_key.Encode(), value);
*res = 1;
} else {
return s;
Expand All @@ -711,10 +716,10 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu
HashesMetaValue meta_value(Slice(meta_value_buf, sizeof(int32_t)));
version = meta_value.UpdateVersion();
// batch.Put(handles_[0], key, meta_value.Encode());
binlog.AppendOperation(0, OperateType::kPut, key, meta_value.Encode());
binlog.AppendPutOperation(0, key, meta_value.Encode());
HashesDataKey data_key(key, version, field);
// batch.Put(handles_[1], data_key.Encode(), value);
binlog.AppendOperation(1, OperateType::kPut, data_key.Encode(), value);
binlog.AppendPutOperation(1, data_key.Encode(), value);
*res = 1;
} else {
return s;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,14 @@ auto Storage::DefaultWriteCallback(Binlog&& log) -> Status {
batch.Put(db->GetColumnFamilyHandle(entry.cf_idx_), entry.key_, *entry.value_);
}
break;
case OperateType::kDelete:
assert(!entry.value_.has_value());
if (entry.cf_idx_ == -1) {
batch.Delete(entry.key_);
} else {
batch.Delete(db->GetColumnFamilyHandle(entry.cf_idx_), entry.key_);
}
break;
default:
assert(0);
}
Expand Down

0 comments on commit 752ee1f

Please sign in to comment.