Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,23 @@ DEFINE_mInt32(unused_rowset_monitor_interval, "30");
DEFINE_mInt32(quering_rowsets_evict_interval, "30");
DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
DEFINE_mString(broken_storage_path, "");
DEFINE_Int32(min_active_scan_threads, "-1");
DEFINE_Int32(min_active_file_scan_threads, "-1");

DEFINE_Validator(min_active_scan_threads, [](const int config) -> bool {
if (config == -1) {
CpuInfo::init();
min_active_scan_threads = CpuInfo::num_cores() * 2;
}
return true;
});
DEFINE_Validator(min_active_file_scan_threads, [](const int config) -> bool {
if (config == -1) {
CpuInfo::init();
min_active_file_scan_threads = CpuInfo::num_cores() * 8;
}
return true;
});

// Config is used to check incompatible old format hdr_ format
// whether doris uses strict way. When config is true, process will log fatal
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ DECLARE_mInt32(unused_rowset_monitor_interval);
DECLARE_mInt32(quering_rowsets_evict_interval);
DECLARE_String(storage_root_path);
DECLARE_mString(broken_storage_path);
DECLARE_Int32(min_active_scan_threads);
DECLARE_Int32(min_active_file_scan_threads);

// Config is used to check incompatible old format hdr_ format
// whether doris uses strict way. When config is true, process will log fatal
Expand Down
49 changes: 44 additions & 5 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,44 @@

namespace doris::pipeline {
#include "common/compile_check_begin.h"

int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
return 1;
}
/*
* The max concurrency of file scanners for each FileScanLocalState is determined by:
* 1. User specified max_file_scanners_concurrency which is set through session variable.
* 2. Default: 16
*
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
*/
return (state->max_file_scanners_concurrency() > 0 ? state->max_file_scanners_concurrency()
: 16) *
(state->query_parallel_instance_num() / _parent->parallelism(state));
}

int FileScanLocalState::min_scanners_concurrency(RuntimeState* state) const {
if (should_run_serial()) {
return 1;
}
/*
* The min concurrency of scanners for each FileScanLocalState is determined by:
* 1. User specified min_file_scanners_concurrency which is set through session variable.
* 2. Default: 1
*
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
*/
return (state->min_file_scanners_concurrency() > 0 ? state->min_file_scanners_concurrency()
: 1) *
(state->query_parallel_instance_num() / _parent->parallelism(state));
}

vectorized::ScannerScheduler* FileScanLocalState::scan_scheduler(RuntimeState* state) const {
return state->get_query_ctx()->get_remote_scan_scheduler();
}

Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {
if (_split_source->num_scan_ranges() == 0) {
_eos = true;
Expand All @@ -44,9 +82,9 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc

auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
uint32_t shard_num = std::min(vectorized::ScannerScheduler::get_remote_scan_thread_num() /
p.query_parallel_instance_num(),
_max_scanners);
uint32_t shard_num = std::min(
vectorized::ScannerScheduler::get_remote_scan_thread_num() / p.parallelism(state()),
_max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (int i = 0; i < _max_scanners; ++i) {
Expand Down Expand Up @@ -82,18 +120,19 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
if (scan_range.__isset.split_source) {
p._batch_split_mode = true;
custom_profile()->add_info_string("BatchSplitMode", "true");
auto split_source = scan_range.split_source;
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(custom_profile(), "GetSplitTime");

_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
_max_scanners = calc_max_scanners(p.parallelism(state));
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
_max_scanners);
}
}

if (!p._batch_split_mode) {
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
_max_scanners = calc_max_scanners(p.parallelism(state));
if (_split_source == nullptr) {
_split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
_max_scanners);
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
const std::vector<TScanRangeParams>& scan_ranges) override;
int parent_id() { return _parent->node_id(); }
std::string name_suffix() const override;
int max_scanners_concurrency(RuntimeState* state) const override;
int min_scanners_concurrency(RuntimeState* state) const override;
vectorized::ScannerScheduler* scan_scheduler(RuntimeState* state) const override;

private:
friend class vectorized::FileScanner;
Expand Down Expand Up @@ -83,8 +86,8 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
bool is_file_scan_operator() const override { return true; }

// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
int query_parallel_instance_num() const override {
return _batch_split_mode ? 1 : _query_parallel_instance_num;
int parallelism(RuntimeState* state) const override {
return _batch_split_mode ? 1 : ScanOperatorX<FileScanLocalState>::parallelism(state);
}

private:
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class OperatorBase {
[[nodiscard]] virtual Status terminate(RuntimeState* state) = 0;
[[nodiscard]] virtual Status close(RuntimeState* state);
[[nodiscard]] virtual int node_id() const = 0;
[[nodiscard]] virtual int parallelism(RuntimeState* state) const {
return _is_serial_operator ? 1 : state->query_parallel_instance_num();
}

[[nodiscard]] virtual Status set_child(OperatorPtr child) {
if (_child && child != nullptr) {
Expand Down
58 changes: 43 additions & 15 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ bool ScanLocalState<Derived>::should_run_serial() const {
return _parent->cast<typename Derived::Parent>()._should_run_serial;
}

int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
return 1;
}
/*
* The max concurrency of scanners for each ScanLocalStateBase is determined by:
* 1. User specified max_scanners_concurrency which is set through session variable.
* 2. Default: 4
*
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
*/
return (state->max_scanners_concurrency() > 0 ? state->max_scanners_concurrency() : 4) *
(state->query_parallel_instance_num() / _parent->parallelism(state));
}

int ScanLocalStateBase::min_scanners_concurrency(RuntimeState* state) const {
if (should_run_serial()) {
return 1;
}
/*
* The min concurrency of scanners for each ScanLocalStateBase is determined by:
* 1. User specified min_scanners_concurrency which is set through session variable.
* 2. Default: 1
*
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
*/
return (state->min_scanners_concurrency() > 0 ? state->min_scanners_concurrency() : 1) *
(state->query_parallel_instance_num() / _parent->parallelism(state));
}

vectorized::ScannerScheduler* ScanLocalStateBase::scan_scheduler(RuntimeState* state) const {
return state->get_query_ctx()->get_scan_scheduler();
}

template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
Expand Down Expand Up @@ -1052,19 +1087,14 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
// If scan operator is serial operator(like topn), its real parallelism is 1.
// Otherwise, its real parallelism is query_parallel_instance_num.
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
// Related pr:
// https://github.com/apache/doris/pull/42460
// https://github.com/apache/doris/pull/44635
const int parallism_of_scan_operator =
p.is_serial_operator() ? 1 : p.query_parallel_instance_num();

_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, parallism_of_scan_operator);
_scanner_ctx = vectorized::ScannerContext::create_shared(state(), this, p._output_tuple_desc,
p.output_row_descriptor(), scanners,
p.limit(), _scan_dependency
#ifdef BE_TEST
,
max_scanners_concurrency(state())
#endif
);
return Status::OK();
}

Expand Down Expand Up @@ -1273,8 +1303,6 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
}
}

_query_parallel_instance_num = state->query_parallel_instance_num();

return Status::OK();
}

Expand Down
16 changes: 10 additions & 6 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
virtual TPushAggOp::type get_push_down_agg_type() = 0;

virtual int64_t get_push_down_count() = 0;
// If scan operator is serial operator(like topn), its real parallelism is 1.
// Otherwise, its real parallelism is query_parallel_instance_num.
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
// Related pr:
// https://github.com/apache/doris/pull/42460
// https://github.com/apache/doris/pull/44635
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const;
[[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
[[nodiscard]] virtual vectorized::ScannerScheduler* scan_scheduler(RuntimeState* state) const;

[[nodiscard]] std::string get_name() { return _parent->get_name(); }

Expand Down Expand Up @@ -358,10 +368,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {

[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }

[[nodiscard]] virtual int query_parallel_instance_num() const {
return _query_parallel_instance_num;
}

[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;

const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
Expand Down Expand Up @@ -438,8 +444,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;

int _query_parallel_instance_num = 0;

std::vector<int> _topn_filter_source_node_ids;
};

Expand Down
26 changes: 17 additions & 9 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,26 @@ class RuntimeState {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
: _query_options.query_timeout;
}
int num_scanner_threads() const {
return _query_options.__isset.num_scanner_threads ? _query_options.num_scanner_threads : 0;
}
int min_scan_concurrency_of_scan_scheduler() const {
return _query_options.__isset.min_scan_scheduler_concurrency
? _query_options.min_scan_scheduler_concurrency
int max_scanners_concurrency() const {
return _query_options.__isset.max_scanners_concurrency
? _query_options.max_scanners_concurrency
: 0;
}
int max_file_scanners_concurrency() const {
return _query_options.__isset.max_file_scanners_concurrency
? _query_options.max_file_scanners_concurrency
: max_scanners_concurrency();
}

int min_scanners_concurrency() const {
return _query_options.__isset.min_scanners_concurrency
? _query_options.min_scanners_concurrency
: 1;
}

int min_scan_concurrency_of_scanner() const {
return _query_options.__isset.min_scanner_concurrency
? _query_options.min_scanner_concurrency
int min_file_scanners_concurrency() const {
return _query_options.__isset.min_file_scanners_concurrency
? _query_options.min_file_scanners_concurrency
: 1;
}

Expand Down
15 changes: 9 additions & 6 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
}

Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
config::doris_scanner_thread_pool_queue_size);
config::doris_scanner_thread_pool_queue_size,
config::min_active_scan_threads);
if (ret.ok()) {
_scan_task_sched = std::move(scan_scheduler);
} else {
Expand All @@ -581,9 +582,9 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
remote_scan_scheduler = std::make_unique<vectorized::ThreadPoolSimplifiedScanScheduler>(
"rs_" + wg_name, cg_cpu_ctl_ptr, wg_name);
}
Status ret =
remote_scan_scheduler->start(max_remote_scan_thread_num, min_remote_scan_thread_num,
remote_scan_thread_queue_size);
Status ret = remote_scan_scheduler->start(
max_remote_scan_thread_num, min_remote_scan_thread_num,
remote_scan_thread_queue_size, config::min_active_file_scan_threads);
if (ret.ok()) {
_remote_scan_task_sched = std::move(remote_scan_scheduler);
} else {
Expand Down Expand Up @@ -614,12 +615,14 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,

// 2 update thread pool
if (scan_thread_num > 0 && _scan_task_sched) {
_scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
_scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num,
config::min_active_scan_threads);
}

if (max_remote_scan_thread_num >= min_remote_scan_thread_num && _remote_scan_task_sched) {
_remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
min_remote_scan_thread_num);
min_remote_scan_thread_num,
config::min_active_file_scan_threads);
}

return upsert_ret;
Expand Down
Loading
Loading