Skip to content

Commit 6f1aaee

Browse files
committed
[Improvement](external) Increase scanner concurrency
1 parent 38d2710 commit 6f1aaee

File tree

11 files changed

+163
-110
lines changed

11 files changed

+163
-110
lines changed

be/src/pipeline/exec/file_scan_operator.cpp

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,39 @@
3131

3232
namespace doris::pipeline {
3333
#include "common/compile_check_begin.h"
34+
35+
int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
36+
// For select * from table limit 10; should just use one thread.
37+
if (should_run_serial()) {
38+
return 1;
39+
}
40+
/*
41+
* The max concurrency of file scanners for each FileScanLocalState is determined by:
42+
* 1. User specified max_file_scanners_concurrency which is set through session variable.
43+
* 2. Default: std::max(512, CpuInfo::num_cores() * 10)
44+
*
45+
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
46+
*/
47+
return (state->max_file_scanners_concurrency() ? state->max_file_scanners_concurrency()
48+
: std::max(512, CpuInfo::num_cores() * 10)) *
49+
(state->query_parallel_instance_num() / _parent->parallelism(state));
50+
}
51+
52+
int FileScanLocalState::min_scanners_concurrency(RuntimeState* state) const {
53+
/*
54+
* The min concurrency of scanners for each ScanLocalStateBase is determined by:
55+
* 1. User specified min_scan_concurrency_of_scan_scheduler which is set through session variable.
56+
* 2. Default: 2 * state->get_query_ctx()->get_remote_scan_scheduler()->get_max_threads()
57+
*/
58+
return state->min_scan_concurrency_of_scan_scheduler()
59+
? state->min_scan_concurrency_of_scan_scheduler()
60+
: 2 * state->get_query_ctx()->get_remote_scan_scheduler()->get_max_threads();
61+
}
62+
63+
vectorized::ScannerScheduler* FileScanLocalState::scan_scheduler(RuntimeState* state) const {
64+
return state->get_query_ctx()->get_remote_scan_scheduler();
65+
}
66+
3467
Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {
3568
if (_split_source->num_scan_ranges() == 0) {
3669
_eos = true;
@@ -44,9 +77,9 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
4477

4578
auto& p = _parent->cast<FileScanOperatorX>();
4679
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
47-
uint32_t shard_num = std::min(vectorized::ScannerScheduler::get_remote_scan_thread_num() /
48-
p.query_parallel_instance_num(),
49-
_max_scanners);
80+
uint32_t shard_num = std::min(
81+
vectorized::ScannerScheduler::get_remote_scan_thread_num() / p.parallelism(state()),
82+
_max_scanners);
5083
shard_num = std::max(shard_num, 1U);
5184
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
5285
for (int i = 0; i < _max_scanners; ++i) {
@@ -85,15 +118,15 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
85118
auto split_source = scan_range.split_source;
86119
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(custom_profile(), "GetSplitTime");
87120

88-
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
121+
_max_scanners = calc_max_scanners(p.parallelism(state));
89122
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
90123
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
91124
_max_scanners);
92125
}
93126
}
94127

95128
if (!p._batch_split_mode) {
96-
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
129+
_max_scanners = calc_max_scanners(p.parallelism(state));
97130
if (_split_source == nullptr) {
98131
_split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
99132
_max_scanners);

be/src/pipeline/exec/file_scan_operator.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
5454
const std::vector<TScanRangeParams>& scan_ranges) override;
5555
int parent_id() { return _parent->node_id(); }
5656
std::string name_suffix() const override;
57+
int max_scanners_concurrency(RuntimeState* state) const override;
58+
int min_scanners_concurrency(RuntimeState* state) const override;
59+
vectorized::ScannerScheduler* scan_scheduler(RuntimeState* state) const override;
5760

5861
private:
5962
friend class vectorized::FileScanner;
@@ -83,8 +86,8 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
8386
bool is_file_scan_operator() const override { return true; }
8487

8588
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
86-
int query_parallel_instance_num() const override {
87-
return _batch_split_mode ? 1 : _query_parallel_instance_num;
89+
int parallelism(RuntimeState* state) const override {
90+
return _batch_split_mode ? 1 : ScanOperatorX<FileScanLocalState>::parallelism(state);
8891
}
8992

9093
private:

be/src/pipeline/exec/operator.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ class OperatorBase {
113113
[[nodiscard]] virtual Status terminate(RuntimeState* state) = 0;
114114
[[nodiscard]] virtual Status close(RuntimeState* state);
115115
[[nodiscard]] virtual int node_id() const = 0;
116+
[[nodiscard]] virtual int parallelism(RuntimeState* state) const {
117+
return _is_serial_operator ? 1 : state->query_parallel_instance_num();
118+
}
116119

117120
[[nodiscard]] virtual Status set_child(OperatorPtr child) {
118121
if (_child && child != nullptr) {

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,38 @@ bool ScanLocalState<Derived>::should_run_serial() const {
7272
return _parent->cast<typename Derived::Parent>()._should_run_serial;
7373
}
7474

75+
int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
76+
// For select * from table limit 10; should just use one thread.
77+
if (should_run_serial()) {
78+
return 1;
79+
}
80+
/*
81+
* The max concurrency of scanners for each ScanLocalStateBase is determined by:
82+
* 1. User specified max_scanners_concurrency which is set through session variable.
83+
* 2. Default: std::max(48, CpuInfo::num_cores() * 2))
84+
*
85+
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
86+
*/
87+
return (state->max_scanners_concurrency() ? state->max_scanners_concurrency()
88+
: std::max(48, CpuInfo::num_cores() * 2)) *
89+
(state->query_parallel_instance_num() / _parent->parallelism(state));
90+
}
91+
92+
int ScanLocalStateBase::min_scanners_concurrency(RuntimeState* state) const {
93+
/*
94+
* The min concurrency of scanners for each ScanLocalStateBase is determined by:
95+
* 1. User specified min_scan_concurrency_of_scan_scheduler which is set through session variable.
96+
* 2. Default: 2 * state->get_query_ctx()->get_scan_scheduler()->get_max_threads()
97+
*/
98+
return state->min_scan_concurrency_of_scan_scheduler()
99+
? state->min_scan_concurrency_of_scan_scheduler()
100+
: 2 * state->get_query_ctx()->get_scan_scheduler()->get_max_threads();
101+
}
102+
103+
vectorized::ScannerScheduler* ScanLocalStateBase::scan_scheduler(RuntimeState* state) const {
104+
return state->get_query_ctx()->get_scan_scheduler();
105+
}
106+
75107
template <typename Derived>
76108
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
77109
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
@@ -1052,19 +1084,14 @@ template <typename Derived>
10521084
Status ScanLocalState<Derived>::_start_scanners(
10531085
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
10541086
auto& p = _parent->cast<typename Derived::Parent>();
1055-
// If scan operator is serial operator(like topn), its real parallelism is 1.
1056-
// Otherwise, its real parallelism is query_parallel_instance_num.
1057-
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
1058-
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
1059-
// Related pr:
1060-
// https://github.com/apache/doris/pull/42460
1061-
// https://github.com/apache/doris/pull/44635
1062-
const int parallism_of_scan_operator =
1063-
p.is_serial_operator() ? 1 : p.query_parallel_instance_num();
1064-
1065-
_scanner_ctx = vectorized::ScannerContext::create_shared(
1066-
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
1067-
_scan_dependency, parallism_of_scan_operator);
1087+
_scanner_ctx = vectorized::ScannerContext::create_shared(state(), this, p._output_tuple_desc,
1088+
p.output_row_descriptor(), scanners,
1089+
p.limit(), _scan_dependency
1090+
#ifdef BE_TEST
1091+
,
1092+
max_scanners_concurrency(state())
1093+
#endif
1094+
);
10681095
return Status::OK();
10691096
}
10701097

@@ -1273,8 +1300,6 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
12731300
}
12741301
}
12751302

1276-
_query_parallel_instance_num = state->query_parallel_instance_num();
1277-
12781303
return Status::OK();
12791304
}
12801305

be/src/pipeline/exec/scan_operator.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
8383
virtual TPushAggOp::type get_push_down_agg_type() = 0;
8484

8585
virtual int64_t get_push_down_count() = 0;
86+
// If scan operator is serial operator(like topn), its real parallelism is 1.
87+
// Otherwise, its real parallelism is query_parallel_instance_num.
88+
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
89+
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
90+
// Related pr:
91+
// https://github.com/apache/doris/pull/42460
92+
// https://github.com/apache/doris/pull/44635
93+
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const;
94+
[[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
95+
[[nodiscard]] virtual vectorized::ScannerScheduler* scan_scheduler(RuntimeState* state) const;
8696

8797
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
8898

@@ -363,10 +373,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
363373

364374
[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
365375

366-
[[nodiscard]] virtual int query_parallel_instance_num() const {
367-
return _query_parallel_instance_num;
368-
}
369-
370376
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
371377

372378
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
@@ -443,8 +449,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
443449
int64_t _push_down_count = -1;
444450
const int _parallel_tasks = 0;
445451

446-
int _query_parallel_instance_num = 0;
447-
448452
std::vector<int> _topn_filter_source_node_ids;
449453
};
450454

be/src/runtime/runtime_state.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,15 @@ class RuntimeState {
142142
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
143143
: _query_options.query_timeout;
144144
}
145-
int num_scanner_threads() const {
146-
return _query_options.__isset.num_scanner_threads ? _query_options.num_scanner_threads : 0;
145+
int max_scanners_concurrency() const {
146+
return _query_options.__isset.max_scanners_concurrency
147+
? _query_options.max_scanners_concurrency
148+
: 0;
149+
}
150+
int max_file_scanners_concurrency() const {
151+
return _query_options.__isset.max_file_scanners_concurrency
152+
? _query_options.max_file_scanners_concurrency
153+
: max_scanners_concurrency();
147154
}
148155
int min_scan_concurrency_of_scan_scheduler() const {
149156
return _query_options.__isset.min_scan_scheduler_concurrency

be/src/vec/exec/scan/scanner_context.cpp

Lines changed: 20 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ ScannerContext::ScannerContext(
5656
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
5757
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
5858
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
59-
std::shared_ptr<pipeline::Dependency> dependency, int parallism_of_scan_operator)
59+
std::shared_ptr<pipeline::Dependency> dependency
60+
#ifdef BE_TEST
61+
,
62+
int num_parallel_instances
63+
#endif
64+
)
6065
: HasTaskExecutionCtx(state),
6166
_state(state),
6267
_local_state(local_state),
@@ -67,9 +72,20 @@ ScannerContext::ScannerContext(
6772
_batch_size(state->batch_size()),
6873
limit(limit_),
6974
_all_scanners(scanners.begin(), scanners.end()),
70-
_parallism_of_scan_operator(parallism_of_scan_operator),
71-
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
72-
_min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) {
75+
#ifndef BE_TEST
76+
_scanner_scheduler(local_state->scan_scheduler(state)),
77+
_min_scan_concurrency_of_scan_scheduler(local_state->min_scanners_concurrency(state)),
78+
_max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
79+
cast_set<int>(scanners.size()))),
80+
#else
81+
_scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
82+
_min_scan_concurrency_of_scan_scheduler(0),
83+
_max_scan_concurrency(num_parallel_instances),
84+
#endif
85+
_min_scan_concurrency(local_state->should_run_serial()
86+
? 1
87+
: std::min(_state->min_scan_concurrency_of_scanner(),
88+
_max_scan_concurrency)) {
7389
DCHECK(_state != nullptr);
7490
DCHECK(_output_row_descriptor == nullptr ||
7591
_output_row_descriptor->tuple_descriptors().size() == 1);
@@ -106,14 +122,6 @@ Status ScannerContext::init() {
106122
auto scanner = _all_scanners.front().lock();
107123
DCHECK(scanner != nullptr);
108124

109-
// TODO: Maybe need refactor.
110-
// A query could have remote scan task and local scan task at the same time.
111-
// So we need to compute the _scanner_scheduler in each scan operator instead of query context.
112-
if (scanner->_scanner->get_storage_type() == TabletStorageType::STORAGE_TYPE_LOCAL) {
113-
_scanner_scheduler = _state->get_query_ctx()->get_scan_scheduler();
114-
} else {
115-
_scanner_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler();
116-
}
117125
if (auto* task_executor_scheduler =
118126
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
119127
std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
@@ -132,43 +140,11 @@ Status ScannerContext::init() {
132140
// Provide more memory for wide tables, increase proportionally by multiples of 300
133141
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
134142

135-
if (_min_scan_concurrency_of_scan_scheduler == 0) {
136-
// _scanner_scheduler->get_max_threads() is setted by workload group.
137-
_min_scan_concurrency_of_scan_scheduler = 2 * _scanner_scheduler->get_max_threads();
138-
}
139-
140143
if (_all_scanners.empty()) {
141144
_is_finished = true;
142145
_set_scanner_done();
143146
}
144147

145-
// The overall target of our system is to make full utilization of the resources.
146-
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
147-
// Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
148-
// So that for a single query, we can make sure it could make full utilization of the resource.
149-
_max_scan_concurrency = _state->num_scanner_threads();
150-
if (_max_scan_concurrency == 0) {
151-
// Why this is safe:
152-
/*
153-
1. If num cpu cores is less than or equal to 24:
154-
_max_concurrency_of_scan_scheduler will be 96. _parallism_of_scan_operator will be 1 or C/2.
155-
so _max_scan_concurrency will be 96 or (96 * 2 / C).
156-
For a single scan node, most scanner it can submit will be 96 or (96 * 2 / C) * (C / 2) which is 96 too.
157-
So a single scan node could make full utilization of the resource without sumbiting all its tasks.
158-
2. If num cpu cores greater than 24:
159-
_max_concurrency_of_scan_scheduler will be 4 * C. _parallism_of_scan_operator will be 1 or C/2.
160-
so _max_scan_concurrency will be 4 * C or (4 * C * 2 / C).
161-
For a single scan node, most scanner it can submit will be 4 * C or (4 * C * 2 / C) * (C / 2) which is 4 * C too.
162-
163-
So, in all situations, when there is only one scan node, it could make full utilization of the resource.
164-
*/
165-
_max_scan_concurrency =
166-
_min_scan_concurrency_of_scan_scheduler / _parallism_of_scan_operator;
167-
_max_scan_concurrency = _max_scan_concurrency == 0 ? 1 : _max_scan_concurrency;
168-
}
169-
170-
_max_scan_concurrency = std::min(_max_scan_concurrency, (int32_t)_pending_scanners.size());
171-
172148
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
173149
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
174150
// you can refer https://github.com/apache/doris/issues/35340 for details.
@@ -191,15 +167,6 @@ Status ScannerContext::init() {
191167
}
192168
}
193169

194-
// For select * from table limit 10; should just use one thread.
195-
if (_local_state->should_run_serial()) {
196-
_max_scan_concurrency = 1;
197-
_min_scan_concurrency = 1;
198-
}
199-
200-
// Avoid corner case.
201-
_min_scan_concurrency = std::min(_min_scan_concurrency, _max_scan_concurrency);
202-
203170
COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
204171
COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
205172

0 commit comments

Comments
 (0)