Skip to content

Commit

Permalink
[opt](load) optimize the performance of row distribution (apache#25546)
Browse files Browse the repository at this point in the history
For non-pipeline non-sinkv2:
before: 14s
now: 6s-
For pipeline + sinkv2:
before: 230ms *48 instances
now: 38ms *48 instances
  • Loading branch information
zclllyybb authored Nov 7, 2023
1 parent fa7a38b commit 16644ef
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 211 deletions.
64 changes: 5 additions & 59 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include "exec/tablet_info.h"

#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Types_types.h>
Expand All @@ -26,6 +25,7 @@
#include <stddef.h>

#include <algorithm>
#include <memory>
#include <ostream>
#include <tuple>

Expand Down Expand Up @@ -324,49 +324,20 @@ Status VOlapTablePartitionParam::init() {
}
}

_partitions_map.reset(
new std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>(
VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs)));
_partitions_map = std::make_unique<
std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>(
VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs));
if (_t_param.__isset.distributed_columns) {
for (auto& col : _t_param.distributed_columns) {
RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed"));
}
}
if (_distributed_slot_locs.empty()) {
_compute_tablet_index = [](BlockRow* key,
const VOlapTablePartition& partition) -> uint32_t {
if (partition.load_tablet_idx == -1) {
// load_to_single_tablet = false, just do random
return butil::fast_rand() % partition.num_buckets;
}
// load_to_single_tablet = ture, do round-robin
return partition.load_tablet_idx % partition.num_buckets;
};
} else {
_compute_tablet_index = [this](BlockRow* key,
const VOlapTablePartition& partition) -> uint32_t {
uint32_t hash_val = 0;
for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
auto slot_desc = _slots[_distributed_slot_locs[i]];
auto& column = key->first->get_by_position(_distributed_slot_locs[i]).column;
auto val = column->get_data_at(key->second);
if (val.data != nullptr) {
hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type,
hash_val);
} else {
hash_val = HashUtil::zlib_crc_hash_null(hash_val);
}
}
return hash_val % partition.num_buckets;
};
}

// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;

// initial partitions
for (int i = 0; i < _t_param.partitions.size(); ++i) {
const TOlapTablePartition& t_part = _t_param.partitions[i];
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
Expand All @@ -385,26 +356,6 @@ Status VOlapTablePartitionParam::init() {
return Status::OK();
}

bool VOlapTablePartitionParam::find_partition(BlockRow* block_row,
const VOlapTablePartition** partition) const {
// block_row is gave by inserting process. So try to use transformed index.
auto it =
_is_in_partition
? _partitions_map->find(std::tuple {block_row->first, block_row->second, true})
: _partitions_map->upper_bound(
std::tuple {block_row->first, block_row->second, true});
// for list partition it might result in default partition
if (_is_in_partition) {
*partition = (it != _partitions_map->end()) ? it->second : _default_partition;
it = _partitions_map->end();
}
if (it != _partitions_map->end() &&
_part_contains(it->second, std::tuple {block_row->first, block_row->second, true})) {
*partition = it->second;
}
return (*partition != nullptr);
}

bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
BlockRowWithIndicator key) const {
// start_key.second == -1 means only single partition
Expand All @@ -413,11 +364,6 @@ bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
!comparator(key, std::tuple {part->start_key.first, part->start_key.second, false});
}

uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row,
const VOlapTablePartition& partition) const {
return _compute_tablet_index(block_row, partition);
}

Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs,
BlockRow* part_key) {
for (int i = 0; i < t_exprs.size(); i++) {
Expand Down
78 changes: 74 additions & 4 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/descriptors.pb.h>

Expand All @@ -33,6 +34,8 @@

#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
Expand Down Expand Up @@ -162,9 +165,78 @@ class VOlapTablePartitionParam {
int64_t version() const { return _t_param.version; }

// return true if we found this block_row in partition
bool find_partition(BlockRow* block_row, const VOlapTablePartition** partition) const;
//TODO: use virtual function to refactor it
ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
VOlapTablePartition*& partition) const {
auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true})
: _partitions_map->upper_bound(std::tuple {block, row, true});
// for list partition it might result in default partition
if (_is_in_partition) {
partition = (it != _partitions_map->end()) ? it->second : _default_partition;
it = _partitions_map->end();
}
if (it != _partitions_map->end() &&
_part_contains(it->second, std::tuple {block, row, true})) {
partition = it->second;
}
return (partition != nullptr);
}

ALWAYS_INLINE void find_tablets(
vectorized::Block* block, const std::vector<uint32_t>& indexes,
const std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_indexes /*result*/,
/*TODO: check if flat hash map will be better*/
std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr) const {
std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)>
compute_function;
if (!_distributed_slot_locs.empty()) {
//TODO: refactor by saving the hash values. then we can calculate in columnwise.
compute_function = [this](vectorized::Block* block, uint32_t row,
const VOlapTablePartition& partition) -> uint32_t {
uint32_t hash_val = 0;
for (unsigned short _distributed_slot_loc : _distributed_slot_locs) {
auto* slot_desc = _slots[_distributed_slot_loc];
auto& column = block->get_by_position(_distributed_slot_loc).column;
auto val = column->get_data_at(row);
if (val.data != nullptr) {
hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type,
hash_val);
} else {
hash_val = HashUtil::zlib_crc_hash_null(hash_val);
}
}
return hash_val % partition.num_buckets;
};
} else { // random distribution
compute_function = [](vectorized::Block* block, uint32_t row,
const VOlapTablePartition& partition) -> uint32_t {
if (partition.load_tablet_idx == -1) {
// load_to_single_tablet = false, just do random
return butil::fast_rand() % partition.num_buckets;
}
// load_to_single_tablet = ture, do round-robin
return partition.load_tablet_idx % partition.num_buckets;
};
}

uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const;
if (partition_tablets_buffer == nullptr) {
for (auto index : indexes) {
tablet_indexes[index] = compute_function(block, index, *partitions[index]);
}
} else { // use buffer
for (auto index : indexes) {
auto& partition_id = partitions[index]->id;
if (auto it = partition_tablets_buffer->find(partition_id);
it != partition_tablets_buffer->end()) {
tablet_indexes[index] = it->second; // tablet
}
// compute and save in buffer
(*partition_tablets_buffer)[partition_id] = tablet_indexes[index] =
compute_function(block, index, *partitions[index]);
}
}
}

const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; }

Expand Down Expand Up @@ -193,8 +265,6 @@ class VOlapTablePartitionParam {

Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos);

std::function<uint32_t(BlockRow*, const VOlapTablePartition&)> _compute_tablet_index;

// check if this partition contain this key
bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const;

Expand Down
76 changes: 40 additions & 36 deletions be/src/vec/sink/vtablet_finder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,24 @@
#include "vec/functions/simple_function_factory.h"

namespace doris::vectorized {
Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows,
std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_index, bool& stop_processing,
std::vector<bool>& skip, std::vector<int64_t>* miss_rows) {
for (int index = 0; index < rows; index++) {
_vpartition->find_partition(block, index, partitions[index]);
}

std::vector<uint32_t> qualified_rows;
qualified_rows.reserve(rows);

Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& stop_processing, bool& is_continue,
bool* missing_partition) {
Status status = Status::OK();
*partition = nullptr;
tablet_index = 0;
BlockRow block_row;
block_row = {block, row_index};
if (!_vpartition->find_partition(&block_row, partition)) {
if (missing_partition != nullptr) { // auto partition table
*missing_partition = true;
return status;
} else {
for (int row_index = 0; row_index < rows; row_index++) {
if (partitions[row_index] == nullptr) [[unlikely]] {
if (miss_rows != nullptr) { // auto partition table
miss_rows->push_back(row_index); // already reserve memory outside
skip[row_index] = true;
continue;
}
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
Expand All @@ -70,33 +73,34 @@ Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int row_
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop processing");
}
is_continue = true;
return status;
skip[row_index] = true;
continue;
}
}
if (!(*partition)->is_mutable) {
_num_immutable_partition_filtered_rows++;
is_continue = true;
return status;
}
if ((*partition)->num_buckets <= 0) {
std::stringstream ss;
ss << "num_buckets must be greater than 0, num_buckets=" << (*partition)->num_buckets;
return Status::InternalError(ss.str());
}
_partition_ids.emplace((*partition)->id);
if (_find_tablet_mode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
_partition_to_tablet_map.emplace((*partition)->id, tablet_index);
} else {
tablet_index = _partition_to_tablet_map[(*partition)->id];
if (!partitions[row_index]->is_mutable) [[unlikely]] {
_num_immutable_partition_filtered_rows++;
skip[row_index] = true;
continue;
}
if (partitions[row_index]->num_buckets <= 0) [[unlikely]] {
std::stringstream ss;
ss << "num_buckets must be greater than 0, num_buckets="
<< partitions[row_index]->num_buckets;
return Status::InternalError(ss.str());
}

_partition_ids.emplace(partitions[row_index]->id);

qualified_rows.push_back(row_index);
}

if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
} else {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
&_partition_to_tablet_map);
}

return status;
return Status::OK();
}

} // namespace doris::vectorized
13 changes: 8 additions & 5 deletions be/src/vec/sink/vtablet_finder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#pragma once

#include <map>
#include <unordered_set>

#include "common/status.h"
#include "exec/tablet_info.h"
#include "util/bitmap.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"

namespace doris::vectorized {
Expand All @@ -39,9 +41,10 @@ class OlapTabletFinder {
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
: _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {};

Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& filtered, bool& is_continue, bool* missing_partition = nullptr);
Status find_tablets(RuntimeState* state, vectorized::Block* block, int rows,
std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_index, bool& filtered,
std::vector<bool>& is_continue, std::vector<int64_t>* miss_rows = nullptr);

bool is_find_tablet_every_sink() {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
Expand All @@ -55,7 +58,7 @@ class OlapTabletFinder {

bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }

const std::set<int64_t>& partition_ids() { return _partition_ids; }
const vectorized::flat_hash_set<int64_t>& partition_ids() { return _partition_ids; }

int64_t num_filtered_rows() const { return _num_filtered_rows; }

Expand All @@ -69,7 +72,7 @@ class OlapTabletFinder {
VOlapTablePartitionParam* _vpartition;
FindTabletMode _find_tablet_mode;
std::map<int64_t, int64_t> _partition_to_tablet_map;
std::set<int64_t> _partition_ids;
vectorized::flat_hash_set<int64_t> _partition_ids;

int64_t _num_filtered_rows = 0;
int64_t _num_immutable_partition_filtered_rows = 0;
Expand Down
Loading

0 comments on commit 16644ef

Please sign in to comment.