Skip to content

Commit 920b8ee

Browse files
committed
[Improvement](external) Increase scanner concurrency
1 parent 636978a commit 920b8ee

File tree

11 files changed

+80
-65
lines changed

11 files changed

+80
-65
lines changed

be/src/pipeline/exec/file_scan_operator.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@
3131

3232
namespace doris::pipeline {
3333
#include "common/compile_check_begin.h"
34+
35+
int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) {
36+
return (state->max_file_scanners_concurrency()
37+
? state->max_file_scanners_concurrency()
38+
: state->get_query_ctx()->get_remote_scan_scheduler()->get_max_threads() * 2 /
39+
_parent->parallelism(state)) *
40+
_parent->parallelism(state);
41+
}
42+
3443
Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {
3544
if (_split_source->num_scan_ranges() == 0) {
3645
_eos = true;
@@ -44,9 +53,9 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
4453

4554
auto& p = _parent->cast<FileScanOperatorX>();
4655
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
47-
uint32_t shard_num = std::min(vectorized::ScannerScheduler::get_remote_scan_thread_num() /
48-
p.query_parallel_instance_num(),
49-
_max_scanners);
56+
uint32_t shard_num = std::min(
57+
vectorized::ScannerScheduler::get_remote_scan_thread_num() / p.parallelism(state()),
58+
_max_scanners);
5059
shard_num = std::max(shard_num, 1U);
5160
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
5261
for (int i = 0; i < _max_scanners; ++i) {
@@ -85,15 +94,15 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
8594
auto split_source = scan_range.split_source;
8695
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(custom_profile(), "GetSplitTime");
8796

88-
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
97+
_max_scanners = calc_max_scanners(p.parallelism(state));
8998
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
9099
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
91100
_max_scanners);
92101
}
93102
}
94103

95104
if (!p._batch_split_mode) {
96-
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
105+
_max_scanners = calc_max_scanners(p.parallelism(state));
97106
if (_split_source == nullptr) {
98107
_split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
99108
_max_scanners);

be/src/pipeline/exec/file_scan_operator.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
5454
const std::vector<TScanRangeParams>& scan_ranges) override;
5555
int parent_id() { return _parent->node_id(); }
5656
std::string name_suffix() const override;
57+
int max_scanners_concurrency(RuntimeState* state) override;
5758

5859
private:
5960
friend class vectorized::FileScanner;
@@ -83,8 +84,8 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
8384
bool is_file_scan_operator() const override { return true; }
8485

8586
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
86-
int query_parallel_instance_num() const override {
87-
return _batch_split_mode ? 1 : _query_parallel_instance_num;
87+
int parallelism(RuntimeState* state) const override {
88+
return _batch_split_mode ? 1 : ScanOperatorX<FileScanLocalState>::parallelism(state);
8889
}
8990

9091
private:

be/src/pipeline/exec/operator.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ class OperatorBase {
113113
[[nodiscard]] virtual Status terminate(RuntimeState* state) = 0;
114114
[[nodiscard]] virtual Status close(RuntimeState* state);
115115
[[nodiscard]] virtual int node_id() const = 0;
116+
[[nodiscard]] virtual int parallelism(RuntimeState* state) const {
117+
return _is_serial_operator ? 1 : state->query_parallel_instance_num();
118+
}
116119

117120
[[nodiscard]] virtual Status set_child(OperatorPtr child) {
118121
if (_child && child != nullptr) {

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ bool ScanLocalState<Derived>::should_run_serial() const {
7272
return _parent->cast<typename Derived::Parent>()._should_run_serial;
7373
}
7474

75+
int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) {
76+
return (state->num_scanner_threads()
77+
? state->num_scanner_threads()
78+
: _state->get_query_ctx()->get_scan_scheduler()->get_max_threads() * 2 /
79+
_parent->parallelism(state)) *
80+
_parent->parallelism(state);
81+
}
82+
7583
template <typename Derived>
7684
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
7785
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
@@ -1052,19 +1060,14 @@ template <typename Derived>
10521060
Status ScanLocalState<Derived>::_start_scanners(
10531061
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
10541062
auto& p = _parent->cast<typename Derived::Parent>();
1055-
// If scan operator is serial operator(like topn), its real parallelism is 1.
1056-
// Otherwise, its real parallelism is query_parallel_instance_num.
1057-
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
1058-
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
1059-
// Related pr:
1060-
// https://github.com/apache/doris/pull/42460
1061-
// https://github.com/apache/doris/pull/44635
1062-
const int parallism_of_scan_operator =
1063-
p.is_serial_operator() ? 1 : p.query_parallel_instance_num();
1064-
1065-
_scanner_ctx = vectorized::ScannerContext::create_shared(
1066-
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
1067-
_scan_dependency, parallism_of_scan_operator);
1063+
_scanner_ctx = vectorized::ScannerContext::create_shared(state(), this, p._output_tuple_desc,
1064+
p.output_row_descriptor(), scanners,
1065+
p.limit(), _scan_dependency
1066+
#ifdef BE_TEST
1067+
,
1068+
max_scanners_concurrency(state())
1069+
#endif
1070+
);
10681071
return Status::OK();
10691072
}
10701073

@@ -1273,8 +1276,6 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
12731276
}
12741277
}
12751278

1276-
_query_parallel_instance_num = state->query_parallel_instance_num();
1277-
12781279
return Status::OK();
12791280
}
12801281

be/src/pipeline/exec/scan_operator.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
8383
virtual TPushAggOp::type get_push_down_agg_type() = 0;
8484

8585
virtual int64_t get_push_down_count() = 0;
86+
// If scan operator is serial operator(like topn), its real parallelism is 1.
87+
// Otherwise, its real parallelism is query_parallel_instance_num.
88+
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
89+
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
90+
// Related pr:
91+
// https://github.com/apache/doris/pull/42460
92+
// https://github.com/apache/doris/pull/44635
93+
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state);
8694

8795
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
8896

@@ -363,10 +371,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
363371

364372
[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
365373

366-
[[nodiscard]] virtual int query_parallel_instance_num() const {
367-
return _query_parallel_instance_num;
368-
}
369-
370374
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
371375

372376
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
@@ -443,8 +447,6 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
443447
int64_t _push_down_count = -1;
444448
const int _parallel_tasks = 0;
445449

446-
int _query_parallel_instance_num = 0;
447-
448450
std::vector<int> _topn_filter_source_node_ids;
449451
};
450452

be/src/runtime/runtime_state.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ class RuntimeState {
145145
int num_scanner_threads() const {
146146
return _query_options.__isset.num_scanner_threads ? _query_options.num_scanner_threads : 0;
147147
}
148+
int max_file_scanners_concurrency() const {
149+
return _query_options.__isset.max_file_scanners_concurrency
150+
? _query_options.max_file_scanners_concurrency
151+
: num_scanner_threads();
152+
}
148153
int min_scan_concurrency_of_scan_scheduler() const {
149154
return _query_options.__isset.min_scan_scheduler_concurrency
150155
? _query_options.min_scan_scheduler_concurrency

be/src/vec/exec/scan/scanner_context.cpp

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ ScannerContext::ScannerContext(
5656
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
5757
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
5858
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
59-
std::shared_ptr<pipeline::Dependency> dependency, int parallism_of_scan_operator)
59+
std::shared_ptr<pipeline::Dependency> dependency
60+
#ifdef BE_TEST
61+
,
62+
int num_parallel_instances
63+
#endif
64+
)
6065
: HasTaskExecutionCtx(state),
6166
_state(state),
6267
_local_state(local_state),
@@ -68,9 +73,14 @@ ScannerContext::ScannerContext(
6873
limit(limit_),
6974
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
7075
_all_scanners(scanners.begin(), scanners.end()),
71-
_parallism_of_scan_operator(parallism_of_scan_operator),
7276
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
7377
_min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) {
78+
#ifndef BE_TEST
79+
_max_scan_concurrency =
80+
std::min(local_state->max_scanners_concurrency(state), cast_set<int>(scanners.size()));
81+
#else
82+
_max_scan_concurrency = num_parallel_instances;
83+
#endif
7484
DCHECK(_state != nullptr);
7585
DCHECK(_output_row_descriptor == nullptr ||
7686
_output_row_descriptor->tuple_descriptors().size() == 1);
@@ -143,33 +153,6 @@ Status ScannerContext::init() {
143153
_set_scanner_done();
144154
}
145155

146-
// The overall target of our system is to make full utilization of the resources.
147-
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
148-
// Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
149-
// So that for a single query, we can make sure it could make full utilization of the resource.
150-
_max_scan_concurrency = _state->num_scanner_threads();
151-
if (_max_scan_concurrency == 0) {
152-
// Why this is safe:
153-
/*
154-
1. If num cpu cores is less than or equal to 24:
155-
_max_concurrency_of_scan_scheduler will be 96. _parallism_of_scan_operator will be 1 or C/2.
156-
so _max_scan_concurrency will be 96 or (96 * 2 / C).
157-
For a single scan node, most scanner it can submit will be 96 or (96 * 2 / C) * (C / 2) which is 96 too.
158-
So a single scan node could make full utilization of the resource without sumbiting all its tasks.
159-
2. If num cpu cores greater than 24:
160-
_max_concurrency_of_scan_scheduler will be 4 * C. _parallism_of_scan_operator will be 1 or C/2.
161-
so _max_scan_concurrency will be 4 * C or (4 * C * 2 / C).
162-
For a single scan node, most scanner it can submit will be 4 * C or (4 * C * 2 / C) * (C / 2) which is 4 * C too.
163-
164-
So, in all situations, when there is only one scan node, it could make full utilization of the resource.
165-
*/
166-
_max_scan_concurrency =
167-
_min_scan_concurrency_of_scan_scheduler / _parallism_of_scan_operator;
168-
_max_scan_concurrency = _max_scan_concurrency == 0 ? 1 : _max_scan_concurrency;
169-
}
170-
171-
_max_scan_concurrency = std::min(_max_scan_concurrency, (int32_t)_pending_scanners.size());
172-
173156
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
174157
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
175158
// you can refer https://github.com/apache/doris/issues/35340 for details.

be/src/vec/exec/scan/scanner_context.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,12 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
123123
const TupleDescriptor* output_tuple_desc,
124124
const RowDescriptor* output_row_descriptor,
125125
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
126-
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency,
127-
int num_parallel_instances);
126+
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency
127+
#ifdef BE_TEST
128+
,
129+
int num_parallel_instances
130+
#endif
131+
);
128132

129133
~ScannerContext() override;
130134
Status init();
@@ -206,7 +210,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
206210
/// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
207211
/// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
208212
void _set_scanner_done();
209-
Status _try_to_scale_up();
210213

211214
RuntimeState* _state = nullptr;
212215
pipeline::ScanLocalStateBase* _local_state = nullptr;
@@ -247,7 +250,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
247250
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
248251
std::shared_ptr<ResourceContext> _resource_ctx;
249252
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
250-
const int _parallism_of_scan_operator;
251253
std::shared_ptr<doris::vectorized::TaskHandle> _task_handle;
252254

253255
std::atomic<int64_t> _block_memory_usage = 0;
@@ -256,6 +258,10 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
256258

257259
int32_t _min_scan_concurrency_of_scan_scheduler = 0;
258260
int32_t _min_scan_concurrency = 1;
261+
// The overall target of our system is to make full utilization of the resources.
262+
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
263+
// Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
264+
// So that for a single query, we can make sure it could make full utilization of the resource.
259265
int32_t _max_scan_concurrency = 0;
260266

261267
std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask> current_scan_task,

be/test/scan/scanner_context_test.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,13 @@ TEST_F(ScannerContextTest, test_init) {
176176
Status st = scanner_context->init();
177177
ASSERT_TRUE(st.ok());
178178
// actual max_scan_concurrency will be 2 since user specified num_scanner_threads is 2.
179-
ASSERT_EQ(scanner_context->_max_scan_concurrency, 2);
179+
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
180180

181181
query_options.__set_num_scanner_threads(0);
182182
state->set_query_options(query_options);
183183

184184
st = scanner_context->init();
185185
ASSERT_TRUE(st.ok());
186-
187-
ASSERT_EQ(scanner_context->_max_scan_concurrency,
188-
scanner_context->_min_scan_concurrency_of_scan_scheduler / parallel_tasks);
189186
}
190187

191188
TEST_F(ScannerContextTest, test_serial_run) {

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public class SessionVariable implements Serializable, Writable {
9494
public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT = "local_exchange_free_blocks_limit";
9595
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
9696
public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
97+
public static final String MAX_FILE_SCANNERS_CONCURRENCY = "max_file_scanners_concurrency";
9798
public static final String MIN_SCANNER_CONCURRENCY = "min_scanner_concurrnency";
9899
public static final String MIN_SCAN_SCHEDULER_CONCURRENCY = "min_scan_scheduler_concurrency";
99100
public static final String QUERY_TIMEOUT = "query_timeout";
@@ -982,6 +983,11 @@ public static double getHotValueThreshold() {
982983
})
983984
public int numScannerThreads = 0;
984985

986+
@VariableMgr.VarAttr(name = MAX_FILE_SCANNERS_CONCURRENCY, needForward = true, description = {
987+
"FileScanNode 扫描数据的最大并发",
988+
"The max threads to read data of FileScanNode"})
989+
public int maxFileScannersConcurrency = 16;
990+
985991
@VariableMgr.VarAttr(name = LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT)
986992
public int localExchangeFreeBlocksLimit = 4;
987993

@@ -4727,6 +4733,7 @@ public TQueryOptions toThrift() {
47274733
tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit);
47284734
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
47294735
tResult.setNumScannerThreads(numScannerThreads);
4736+
tResult.setMaxFileScannersConcurrency(maxFileScannersConcurrency);
47304737
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
47314738
tResult.setParallelPrepareThreshold(parallelPrepareThreshold);
47324739

0 commit comments

Comments
 (0)