Skip to content

Commit 1bb4669

Browse files
authored
[Improvement](external) Increase scanner concurrency (#58073)
### What problem does this PR solve? The concurrency of scanners are controlled by some variables which is not suitable for different cases. For external tables, we need more scanners than the internal tables. In this PR, the concurrency variable is splitted from the origin one. ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent ffffc97 commit 1bb4669

File tree

17 files changed

+268
-227
lines changed

17 files changed

+268
-227
lines changed

be/src/common/config.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,23 @@ DEFINE_mInt32(unused_rowset_monitor_interval, "30");
369369
DEFINE_mInt32(quering_rowsets_evict_interval, "30");
370370
DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
371371
DEFINE_mString(broken_storage_path, "");
372+
DEFINE_Int32(min_active_scan_threads, "-1");
373+
DEFINE_Int32(min_active_file_scan_threads, "-1");
374+
375+
DEFINE_Validator(min_active_scan_threads, [](const int config) -> bool {
376+
if (config == -1) {
377+
CpuInfo::init();
378+
min_active_scan_threads = CpuInfo::num_cores() * 2;
379+
}
380+
return true;
381+
});
382+
DEFINE_Validator(min_active_file_scan_threads, [](const int config) -> bool {
383+
if (config == -1) {
384+
CpuInfo::init();
385+
min_active_file_scan_threads = CpuInfo::num_cores() * 8;
386+
}
387+
return true;
388+
});
372389

373390
// Config is used to check incompatible old format hdr_ format
374391
// whether doris uses strict way. When config is true, process will log fatal

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ DECLARE_mInt32(unused_rowset_monitor_interval);
398398
DECLARE_mInt32(quering_rowsets_evict_interval);
399399
DECLARE_String(storage_root_path);
400400
DECLARE_mString(broken_storage_path);
401+
DECLARE_Int32(min_active_scan_threads);
402+
DECLARE_Int32(min_active_file_scan_threads);
401403

402404
// Config is used to check incompatible old format hdr_ format
403405
// whether doris uses strict way. When config is true, process will log fatal

be/src/pipeline/exec/file_scan_operator.cpp

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,44 @@
3131

3232
namespace doris::pipeline {
3333
#include "common/compile_check_begin.h"
34+
35+
int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
36+
// For select * from table limit 10; should just use one thread.
37+
if (should_run_serial()) {
38+
return 1;
39+
}
40+
/*
41+
* The max concurrency of file scanners for each FileScanLocalState is determined by:
42+
* 1. User specified max_file_scanners_concurrency which is set through session variable.
43+
* 2. Default: 16
44+
*
45+
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
46+
*/
47+
return (state->max_file_scanners_concurrency() > 0 ? state->max_file_scanners_concurrency()
48+
: 16) *
49+
(state->query_parallel_instance_num() / _parent->parallelism(state));
50+
}
51+
52+
int FileScanLocalState::min_scanners_concurrency(RuntimeState* state) const {
53+
if (should_run_serial()) {
54+
return 1;
55+
}
56+
/*
57+
* The min concurrency of scanners for each FileScanLocalState is determined by:
58+
* 1. User specified min_file_scanners_concurrency which is set through session variable.
59+
* 2. Default: 1
60+
*
61+
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
62+
*/
63+
return (state->min_file_scanners_concurrency() > 0 ? state->min_file_scanners_concurrency()
64+
: 1) *
65+
(state->query_parallel_instance_num() / _parent->parallelism(state));
66+
}
67+
68+
vectorized::ScannerScheduler* FileScanLocalState::scan_scheduler(RuntimeState* state) const {
69+
return state->get_query_ctx()->get_remote_scan_scheduler();
70+
}
71+
3472
Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {
3573
if (_split_source->num_scan_ranges() == 0) {
3674
_eos = true;
@@ -44,9 +82,9 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
4482

4583
auto& p = _parent->cast<FileScanOperatorX>();
4684
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
47-
uint32_t shard_num = std::min(vectorized::ScannerScheduler::get_remote_scan_thread_num() /
48-
p.query_parallel_instance_num(),
49-
_max_scanners);
85+
uint32_t shard_num = std::min(
86+
vectorized::ScannerScheduler::get_remote_scan_thread_num() / p.parallelism(state()),
87+
_max_scanners);
5088
shard_num = std::max(shard_num, 1U);
5189
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
5290
for (int i = 0; i < _max_scanners; ++i) {
@@ -82,18 +120,19 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
82120
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
83121
if (scan_range.__isset.split_source) {
84122
p._batch_split_mode = true;
123+
custom_profile()->add_info_string("BatchSplitMode", "true");
85124
auto split_source = scan_range.split_source;
86125
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(custom_profile(), "GetSplitTime");
87126

88-
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
127+
_max_scanners = calc_max_scanners(p.parallelism(state));
89128
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
90129
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
91130
_max_scanners);
92131
}
93132
}
94133

95134
if (!p._batch_split_mode) {
96-
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
135+
_max_scanners = calc_max_scanners(p.parallelism(state));
97136
if (_split_source == nullptr) {
98137
_split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
99138
_max_scanners);

be/src/pipeline/exec/file_scan_operator.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
5454
const std::vector<TScanRangeParams>& scan_ranges) override;
5555
int parent_id() { return _parent->node_id(); }
5656
std::string name_suffix() const override;
57+
int max_scanners_concurrency(RuntimeState* state) const override;
58+
int min_scanners_concurrency(RuntimeState* state) const override;
59+
vectorized::ScannerScheduler* scan_scheduler(RuntimeState* state) const override;
5760

5861
private:
5962
friend class vectorized::FileScanner;
@@ -83,8 +86,8 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
8386
bool is_file_scan_operator() const override { return true; }
8487

8588
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
86-
int query_parallel_instance_num() const override {
87-
return _batch_split_mode ? 1 : _query_parallel_instance_num;
89+
int parallelism(RuntimeState* state) const override {
90+
return _batch_split_mode ? 1 : ScanOperatorX<FileScanLocalState>::parallelism(state);
8891
}
8992

9093
private:

be/src/pipeline/exec/operator.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ class OperatorBase {
113113
[[nodiscard]] virtual Status terminate(RuntimeState* state) = 0;
114114
[[nodiscard]] virtual Status close(RuntimeState* state);
115115
[[nodiscard]] virtual int node_id() const = 0;
116+
[[nodiscard]] virtual int parallelism(RuntimeState* state) const {
117+
return _is_serial_operator ? 1 : state->query_parallel_instance_num();
118+
}
116119

117120
[[nodiscard]] virtual Status set_child(OperatorPtr child) {
118121
if (_child && child != nullptr) {

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,41 @@ bool ScanLocalState<Derived>::should_run_serial() const {
7272
return _parent->cast<typename Derived::Parent>()._should_run_serial;
7373
}
7474

75+
int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
76+
// For select * from table limit 10; should just use one thread.
77+
if (should_run_serial()) {
78+
return 1;
79+
}
80+
/*
81+
* The max concurrency of scanners for each ScanLocalStateBase is determined by:
82+
* 1. User specified max_scanners_concurrency which is set through session variable.
83+
* 2. Default: 4
84+
*
85+
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
86+
*/
87+
return (state->max_scanners_concurrency() > 0 ? state->max_scanners_concurrency() : 4) *
88+
(state->query_parallel_instance_num() / _parent->parallelism(state));
89+
}
90+
91+
int ScanLocalStateBase::min_scanners_concurrency(RuntimeState* state) const {
92+
if (should_run_serial()) {
93+
return 1;
94+
}
95+
/*
96+
* The min concurrency of scanners for each ScanLocalStateBase is determined by:
97+
* 1. User specified min_scanners_concurrency which is set through session variable.
98+
* 2. Default: 1
99+
*
100+
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
101+
*/
102+
return (state->min_scanners_concurrency() > 0 ? state->min_scanners_concurrency() : 1) *
103+
(state->query_parallel_instance_num() / _parent->parallelism(state));
104+
}
105+
106+
vectorized::ScannerScheduler* ScanLocalStateBase::scan_scheduler(RuntimeState* state) const {
107+
return state->get_query_ctx()->get_scan_scheduler();
108+
}
109+
75110
template <typename Derived>
76111
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
77112
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
@@ -1052,19 +1087,14 @@ template <typename Derived>
10521087
Status ScanLocalState<Derived>::_start_scanners(
10531088
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
10541089
auto& p = _parent->cast<typename Derived::Parent>();
1055-
// If scan operator is serial operator(like topn), its real parallelism is 1.
1056-
// Otherwise, its real parallelism is query_parallel_instance_num.
1057-
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
1058-
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
1059-
// Related pr:
1060-
// https://github.com/apache/doris/pull/42460
1061-
// https://github.com/apache/doris/pull/44635
1062-
const int parallism_of_scan_operator =
1063-
p.is_serial_operator() ? 1 : p.query_parallel_instance_num();
1064-
1065-
_scanner_ctx = vectorized::ScannerContext::create_shared(
1066-
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
1067-
_scan_dependency, parallism_of_scan_operator);
1090+
_scanner_ctx = vectorized::ScannerContext::create_shared(state(), this, p._output_tuple_desc,
1091+
p.output_row_descriptor(), scanners,
1092+
p.limit(), _scan_dependency
1093+
#ifdef BE_TEST
1094+
,
1095+
max_scanners_concurrency(state())
1096+
#endif
1097+
);
10681098
return Status::OK();
10691099
}
10701100

@@ -1273,8 +1303,6 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
12731303
}
12741304
}
12751305

1276-
_query_parallel_instance_num = state->query_parallel_instance_num();
1277-
12781306
return Status::OK();
12791307
}
12801308

be/src/pipeline/exec/scan_operator.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
8383
virtual TPushAggOp::type get_push_down_agg_type() = 0;
8484

8585
virtual int64_t get_push_down_count() = 0;
86+
// If scan operator is serial operator(like topn), its real parallelism is 1.
87+
// Otherwise, its real parallelism is query_parallel_instance_num.
88+
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
89+
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
90+
// Related pr:
91+
// https://github.com/apache/doris/pull/42460
92+
// https://github.com/apache/doris/pull/44635
93+
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const;
94+
[[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
95+
[[nodiscard]] virtual vectorized::ScannerScheduler* scan_scheduler(RuntimeState* state) const;
8696

8797
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
8898

@@ -358,10 +368,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
358368

359369
[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
360370

361-
[[nodiscard]] virtual int query_parallel_instance_num() const {
362-
return _query_parallel_instance_num;
363-
}
364-
365371
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
366372

367373
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
@@ -438,8 +444,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
438444
int64_t _push_down_count = -1;
439445
const int _parallel_tasks = 0;
440446

441-
int _query_parallel_instance_num = 0;
442-
443447
std::vector<int> _topn_filter_source_node_ids;
444448
};
445449

be/src/runtime/runtime_state.h

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,26 @@ class RuntimeState {
142142
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
143143
: _query_options.query_timeout;
144144
}
145-
int num_scanner_threads() const {
146-
return _query_options.__isset.num_scanner_threads ? _query_options.num_scanner_threads : 0;
147-
}
148-
int min_scan_concurrency_of_scan_scheduler() const {
149-
return _query_options.__isset.min_scan_scheduler_concurrency
150-
? _query_options.min_scan_scheduler_concurrency
145+
int max_scanners_concurrency() const {
146+
return _query_options.__isset.max_scanners_concurrency
147+
? _query_options.max_scanners_concurrency
151148
: 0;
152149
}
150+
int max_file_scanners_concurrency() const {
151+
return _query_options.__isset.max_file_scanners_concurrency
152+
? _query_options.max_file_scanners_concurrency
153+
: max_scanners_concurrency();
154+
}
155+
156+
int min_scanners_concurrency() const {
157+
return _query_options.__isset.min_scanners_concurrency
158+
? _query_options.min_scanners_concurrency
159+
: 1;
160+
}
153161

154-
int min_scan_concurrency_of_scanner() const {
155-
return _query_options.__isset.min_scanner_concurrency
156-
? _query_options.min_scanner_concurrency
162+
int min_file_scanners_concurrency() const {
163+
return _query_options.__isset.min_file_scanners_concurrency
164+
? _query_options.min_file_scanners_concurrency
157165
: 1;
158166
}
159167

be/src/runtime/workload_group/workload_group.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,8 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
560560
}
561561

562562
Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
563-
config::doris_scanner_thread_pool_queue_size);
563+
config::doris_scanner_thread_pool_queue_size,
564+
config::min_active_scan_threads);
564565
if (ret.ok()) {
565566
_scan_task_sched = std::move(scan_scheduler);
566567
} else {
@@ -581,9 +582,9 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
581582
remote_scan_scheduler = std::make_unique<vectorized::ThreadPoolSimplifiedScanScheduler>(
582583
"rs_" + wg_name, cg_cpu_ctl_ptr, wg_name);
583584
}
584-
Status ret =
585-
remote_scan_scheduler->start(max_remote_scan_thread_num, min_remote_scan_thread_num,
586-
remote_scan_thread_queue_size);
585+
Status ret = remote_scan_scheduler->start(
586+
max_remote_scan_thread_num, min_remote_scan_thread_num,
587+
remote_scan_thread_queue_size, config::min_active_file_scan_threads);
587588
if (ret.ok()) {
588589
_remote_scan_task_sched = std::move(remote_scan_scheduler);
589590
} else {
@@ -614,12 +615,14 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
614615

615616
// 2 update thread pool
616617
if (scan_thread_num > 0 && _scan_task_sched) {
617-
_scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
618+
_scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num,
619+
config::min_active_scan_threads);
618620
}
619621

620622
if (max_remote_scan_thread_num >= min_remote_scan_thread_num && _remote_scan_task_sched) {
621623
_remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
622-
min_remote_scan_thread_num);
624+
min_remote_scan_thread_num,
625+
config::min_active_file_scan_threads);
623626
}
624627

625628
return upsert_ret;

0 commit comments

Comments
 (0)