Skip to content

Commit

Permalink
Merge branch 'master' into bugfix-111
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Sep 28, 2024
2 parents 53b29bd + 700b2c4 commit 5d84c24
Show file tree
Hide file tree
Showing 808 changed files with 97,172 additions and 1,738 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ thirdparty/doris-thirdparty*.tar.xz
docker/thirdparties/docker-compose/mysql/data
docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
docker/thirdparties/docker-compose/hive/scripts/paimon1
docker/thirdparties/docker-compose/hive/scripts/tvf_data

fe_plugins/output
fe_plugins/**/.factorypath
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ Status CloudBaseCompaction::modify_rowsets() {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down
25 changes: 6 additions & 19 deletions be/src/io/fs/s3_file_bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "io/cache/file_cache_common.h"
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/slice.h"
Expand Down Expand Up @@ -78,19 +77,17 @@ Slice FileBuffer::get_slice() const {
}

FileBuffer::FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder,
size_t offset, OperationState state,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
size_t offset, OperationState state)
: _type(type),
_alloc_holder(std::move(alloc_holder)),
_offset(offset),
_size(0),
_state(std::move(state)),
_inner_data(std::make_unique<FileBuffer::PartData>()),
_capacity(_inner_data->size()),
_mem_tracker(std::move(mem_tracker)) {}
_capacity(_inner_data->size()) {}

FileBuffer::~FileBuffer() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
_inner_data.reset();
}

Expand Down Expand Up @@ -243,31 +240,21 @@ FileBufferBuilder& FileBufferBuilder::set_allocate_file_blocks_holder(
}

Status FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
auto mem_tracker = ExecEnv::GetInstance()->s3_file_buffer_tracker();
auto* thread_ctx = doris::thread_context(true);
if (thread_ctx != nullptr) {
// if thread local mem tracker is set, use it instead.
auto curr_tracker = thread_ctx->thread_mem_tracker_mgr->limiter_mem_tracker();
if (curr_tracker != ExecEnv::GetInstance()->orphan_mem_tracker()) {
mem_tracker = std::move(curr_tracker);
}
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
OperationState state(_sync_after_complete_task, _is_cancelled);

if (_type == BufferType::UPLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
std::move(_upload_cb), std::move(state), _offset,
std::move(_alloc_holder_cb), std::move(mem_tracker)));
std::move(_alloc_holder_cb)));
return Status::OK();
}
if (_type == BufferType::DOWNLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<DownloadFileBuffer>(
std::move(_download),
std::move(_write_to_local_file_cache),
std::move(_write_to_use_buffer), std::move(state),
_offset, std::move(_alloc_holder_cb),
std::move(mem_tracker)));
_offset, std::move(_alloc_holder_cb)));
return Status::OK();
}
// should never come here
Expand Down
14 changes: 5 additions & 9 deletions be/src/io/fs/s3_file_bufferpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include "common/status.h"
#include "io/cache/file_block.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/crc32c.h"
#include "util/slice.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -78,7 +77,7 @@ struct OperationState {

struct FileBuffer {
FileBuffer(BufferType type, std::function<FileBlocksHolderPtr()> alloc_holder, size_t offset,
OperationState state, std::shared_ptr<MemTrackerLimiter> mem_tracker);
OperationState state);
virtual ~FileBuffer();
/**
* submit the correspoding task to async executor
Expand Down Expand Up @@ -128,16 +127,14 @@ struct FileBuffer {
struct PartData;
std::unique_ptr<PartData> _inner_data;
size_t _capacity;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

struct DownloadFileBuffer final : public FileBuffer {
DownloadFileBuffer(std::function<Status(Slice&)> download,
std::function<void(FileBlocksHolderPtr, Slice)> write_to_cache,
std::function<void(Slice, size_t)> write_to_use_buffer, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
: FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state, std::move(mem_tracker)),
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::DOWNLOAD, alloc_holder, offset, state),
_download(std::move(download)),
_write_to_local_file_cache(std::move(write_to_cache)),
_write_to_use_buffer(std::move(write_to_use_buffer)) {}
Expand All @@ -156,9 +153,8 @@ struct DownloadFileBuffer final : public FileBuffer {

struct UploadFileBuffer final : public FileBuffer {
UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, OperationState state,
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder,
std::shared_ptr<MemTrackerLimiter> mem_tracker)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state, std::move(mem_tracker)),
size_t offset, std::function<FileBlocksHolderPtr()> alloc_holder)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
_upload_to_remote(std::move(upload_cb)) {}
~UploadFileBuffer() override = default;
Status append_data(const Slice& s) override;
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ Compaction::Compaction(BaseTabletSPtr tablet, const std::string& label)
_tablet(std::move(tablet)),
_is_vertical(config::enable_vertical_compaction),
_allow_delete_in_cumu_compaction(config::enable_delete_when_cumu_compaction) {
;
init_profile(label);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_rowid_conversion = std::make_unique<RowIdConversion>();
}

Compaction::~Compaction() {
Expand All @@ -130,6 +131,7 @@ Compaction::~Compaction() {
_input_rowsets.clear();
_output_rowset.reset();
_cur_tablet_schema.reset();
_rowid_conversion.reset();
}

void Compaction::init_profile(const std::string& label) {
Expand Down Expand Up @@ -176,7 +178,7 @@ Status Compaction::merge_input_rowsets() {
// the row ID conversion matrix needs to be used for inverted index compaction.
if (!ctx.skip_inverted_index.empty() || (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
_stats.rowid_conversion = &_rowid_conversion;
_stats.rowid_conversion = _rowid_conversion.get();
}

int64_t way_num = merge_way_num();
Expand Down Expand Up @@ -493,7 +495,7 @@ Status Compaction::do_inverted_index_compaction() {
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
// Convert the delete bitmap of the input rowsets to output rowset.
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map,
_input_rowsets, *_rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map,
_tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap);

if (!_allow_delete_in_cumu_compaction) {
Expand Down Expand Up @@ -947,7 +949,7 @@ Status CompactionMixin::modify_rowsets() {
// TODO(LiaoXin): check if there are duplicate keys
std::size_t missed_rows_size = 0;
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (missed_rows) {
Expand Down Expand Up @@ -1022,7 +1024,7 @@ Status CompactionMixin::modify_rowsets() {
}
DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap);
if (config::enable_merge_on_write_correctness_check) {
RowsetIdUnorderedSet rowsetids;
Expand All @@ -1042,7 +1044,7 @@ Status CompactionMixin::modify_rowsets() {
// Convert the delete bitmap of the input rowsets to output rowset for
// incremental data.
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX,
_input_rowsets, *_rowid_conversion, version.second, UINT64_MAX,
missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class Compaction {
Version _output_version;

int64_t _newest_write_timestamp {-1};
RowIdConversion _rowid_conversion;
std::unique_ptr<RowIdConversion> _rowid_conversion = nullptr;
TabletSchemaSPtr _cur_tablet_schema;

std::unique_ptr<RuntimeProfile> _profile;
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
}
if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow &&
!_tablet_schema->cluster_key_idxes().empty()) {
if (_is_partial_update) {
return Status::InternalError(
"Partial update for mow with cluster keys is not supported");
}
RETURN_IF_ERROR(_sort_by_cluster_keys());
}
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
Expand Down
29 changes: 26 additions & 3 deletions be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_common.h"
#include "olap/utils.h"
#include "runtime/thread_context.h"

namespace doris {

Expand All @@ -33,17 +34,24 @@ namespace doris {
class RowIdConversion {
public:
RowIdConversion() = default;
~RowIdConversion() = default;
~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }

// resize segment rowid map to its rows num
void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
size_t delta_std_pair_cap = 0;
for (size_t i = 0; i < num_rows.size(); i++) {
uint32_t id = _segments_rowid_map.size();
_segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id);
_id_to_segment_map.emplace_back(src_rowset_id, i);
_segments_rowid_map.emplace_back(std::vector<std::pair<uint32_t, uint32_t>>(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX)));
std::vector<std::pair<uint32_t, uint32_t>> vec(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX));
delta_std_pair_cap += vec.capacity();
_segments_rowid_map.emplace_back(std::move(vec));
}
//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
// indexCompaction is a thridparty code, it's too complex to modify it.
// refer compact_column.
track_mem_usage(delta_std_pair_cap);
}

// set dst rowset id
Expand Down Expand Up @@ -109,12 +117,27 @@ class RowIdConversion {
return _segment_to_id_map.at(segment);
}

private:
void track_mem_usage(size_t delta_std_pair_cap) {
_std_pair_cap += delta_std_pair_cap;

size_t new_size =
_std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
_segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>);

RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used);
CONSUME_THREAD_MEM_TRACKER(new_size);
_seg_rowid_map_mem_used = new_size;
}

private:
// the first level vector: index indicates src segment.
// the second level vector: index indicates row id of source segment,
// value indicates row id of destination segment.
// <UINT32_MAX, UINT32_MAX> indicates current row not exist.
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> _segments_rowid_map;
size_t _seg_rowid_map_mem_used {0};
size_t _std_pair_cap {0};

// Map source segment to 0 to n
std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map;
Expand Down
19 changes: 19 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) {
return Status::Error<END_OF_FILE>("BetaRowsetReader is empty");
}

RuntimeState* runtime_state = nullptr;
if (_read_context != nullptr) {
runtime_state = _read_context->runtime_state;
}

do {
auto s = _iterator->next_batch(block);
if (!s.ok()) {
Expand All @@ -359,6 +364,10 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) {
}
return s;
}

if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
return runtime_state->cancel_reason();
}
} while (block->empty());

return Status::OK();
Expand All @@ -367,6 +376,12 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) {
Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
RETURN_IF_ERROR(_init_iterator_once());

RuntimeState* runtime_state = nullptr;
if (_read_context != nullptr) {
runtime_state = _read_context->runtime_state;
}

do {
auto s = _iterator->next_block_view(block_view);
if (!s.ok()) {
Expand All @@ -375,6 +390,10 @@ Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
}
return s;
}

if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
return runtime_state->cancel_reason();
}
} while (block_view->empty());

return Status::OK();
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
return Status::OK();
}

Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
if (_segcompaction_worker) {
_segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id);
}
return Status::OK();
}

Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id) {
DCHECK(_rowset_meta->is_local());
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {

Status build(RowsetSharedPtr& rowset) override;

Status init(const RowsetWriterContext& rowset_writer_context) override;

Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) override;

Expand Down Expand Up @@ -318,7 +320,7 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
// already been segment compacted
std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction

std::shared_ptr<SegcompactionWorker> _segcompaction_worker;
std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr;

// ensure only one inflight segcompaction task for each rowset
std::atomic<bool> _is_doing_segcompaction {false};
Expand Down
Loading

0 comments on commit 5d84c24

Please sign in to comment.