Skip to content

Commit f471ead

Browse files
committed
update
1 parent dbf3850 commit f471ead

File tree

7 files changed

+64
-44
lines changed

7 files changed

+64
-44
lines changed

be/src/pipeline/exec/file_scan_operator.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,39 @@
3232
namespace doris::pipeline {
3333
#include "common/compile_check_begin.h"
3434

35-
int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) {
35+
int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
36+
// For select * from table limit 10; should just use one thread.
37+
if (should_run_serial()) {
38+
return 1;
39+
}
3640
/*
3741
* The max concurrency of file scanners for each FileScanLocalState is determined by:
3842
* 1. User specified max_file_scanners_concurrency which is set through session variable.
39-
* 2. Default: std::max((std::max(512, CpuInfo::num_cores() * 10)), config::doris_scanner_thread_pool_thread_num) * 2 / operator parallelism
43+
* 2. Default: std::max(512, CpuInfo::num_cores() * 10)
4044
*
4145
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
4246
*/
4347
return (state->max_file_scanners_concurrency()
4448
? state->max_file_scanners_concurrency()
45-
: state->get_query_ctx()->get_remote_scan_scheduler()->get_max_threads() * 2 /
46-
_parent->parallelism(state)) *
49+
: std::max(512, CpuInfo::num_cores() * 10)) *
4750
(state->query_parallel_instance_num() / _parent->parallelism(state));
4851
}
4952

53+
int FileScanLocalState::min_scanners_concurrency(RuntimeState* state) const {
54+
/*
55+
* The min concurrency of scanners for each ScanLocalStateBase is determined by:
56+
* 1. User specified min_scan_concurrency_of_scan_scheduler which is set through session variable.
57+
* 2. Default: 2 * state->get_query_ctx()->get_remote_scan_scheduler()->get_max_threads()
58+
*/
59+
return state->min_scan_concurrency_of_scan_scheduler()
60+
? state->min_scan_concurrency_of_scan_scheduler()
61+
: 2 * state->get_query_ctx()->get_remote_scan_scheduler()->get_max_threads();
62+
}
63+
64+
vectorized::SimplifiedScanScheduler* FileScanLocalState::scan_scheduler(RuntimeState* state) const {
65+
return state->get_query_ctx()->get_remote_scan_scheduler();
66+
}
67+
5068
Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {
5169
if (_split_source->num_scan_ranges() == 0) {
5270
_eos = true;

be/src/pipeline/exec/file_scan_operator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
5454
const std::vector<TScanRangeParams>& scan_ranges) override;
5555
int parent_id() { return _parent->node_id(); }
5656
std::string name_suffix() const override;
57-
int max_scanners_concurrency(RuntimeState* state) override;
57+
int max_scanners_concurrency(RuntimeState* state) const override;
58+
int min_scanners_concurrency(RuntimeState* state) const override;
59+
vectorized::SimplifiedScanScheduler* scan_scheduler(RuntimeState* state) const override;
5860

5961
private:
6062
friend class vectorized::FileScanner;

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,39 @@ bool ScanLocalState<Derived>::should_run_serial() const {
7272
return _parent->cast<typename Derived::Parent>()._should_run_serial;
7373
}
7474

75-
int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) {
75+
int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
76+
// For select * from table limit 10; should just use one thread.
77+
if (should_run_serial()) {
78+
return 1;
79+
}
7680
/*
7781
* The max concurrency of scanners for each ScanLocalStateBase is determined by:
7882
* 1. User specified max_scanners_concurrency which is set through session variable.
79-
* 2. Default: config::doris_scanner_thread_pool_thread_num / operator parallelism
83+
* 2. Default: std::max(48, CpuInfo::num_cores() * 2))
8084
*
8185
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
8286
*/
8387
return (state->max_scanners_concurrency()
8488
? state->max_scanners_concurrency()
85-
: _state->get_query_ctx()->get_scan_scheduler()->get_max_threads() * 2 /
86-
_parent->parallelism(state)) *
89+
: std::max(48, CpuInfo::num_cores() * 2)) *
8790
(state->query_parallel_instance_num() / _parent->parallelism(state));
8891
}
8992

93+
int ScanLocalStateBase::min_scanners_concurrency(RuntimeState* state) const {
94+
/*
95+
* The min concurrency of scanners for each ScanLocalStateBase is determined by:
96+
* 1. User specified min_scan_concurrency_of_scan_scheduler which is set through session variable.
97+
* 2. Default: 2 * state->get_query_ctx()->get_scan_scheduler()->get_max_threads()
98+
*/
99+
return state->min_scan_concurrency_of_scan_scheduler()
100+
? state->min_scan_concurrency_of_scan_scheduler()
101+
: 2 * state->get_query_ctx()->get_scan_scheduler()->get_max_threads();
102+
}
103+
104+
vectorized::SimplifiedScanScheduler* ScanLocalStateBase::scan_scheduler(RuntimeState* state) const {
105+
return state->get_query_ctx()->get_scan_scheduler();
106+
}
107+
90108
template <typename Derived>
91109
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
92110
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));

be/src/pipeline/exec/scan_operator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
9090
// Related pr:
9191
// https://github.com/apache/doris/pull/42460
9292
// https://github.com/apache/doris/pull/44635
93-
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state);
93+
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const;
94+
[[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
95+
[[nodiscard]] virtual vectorized::SimplifiedScanScheduler* scan_scheduler(RuntimeState* state) const;
9496

9597
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
9698

be/src/vec/exec/scan/scanner_context.cpp

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,16 @@ ScannerContext::ScannerContext(
7373
limit(limit_),
7474
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
7575
_all_scanners(scanners.begin(), scanners.end()),
76-
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
77-
_min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) {
7876
#ifndef BE_TEST
79-
_max_scan_concurrency =
80-
std::min(local_state->max_scanners_concurrency(state), cast_set<int>(scanners.size()));
77+
_scanner_scheduler(local_state->scan_scheduler(state)),
78+
_min_scan_concurrency_of_scan_scheduler(local_state->min_scanners_concurrency(state)),
79+
_max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state), cast_set<int>(scanners.size()))),
8180
#else
82-
_max_scan_concurrency = num_parallel_instances;
81+
_scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
82+
_min_scan_concurrency_of_scan_scheduler(0),
83+
_max_scan_concurrency(num_parallel_instances),
8384
#endif
85+
_min_scan_concurrency(local_state->should_run_serial() ? 1 : std::min(_state->min_scan_concurrency_of_scanner(), _max_scan_concurrency)) {
8486
DCHECK(_state != nullptr);
8587
DCHECK(_output_row_descriptor == nullptr ||
8688
_output_row_descriptor->tuple_descriptors().size() == 1);
@@ -117,16 +119,8 @@ Status ScannerContext::init() {
117119
auto scanner = _all_scanners.front().lock();
118120
DCHECK(scanner != nullptr);
119121

120-
// TODO: Maybe need refactor.
121-
// A query could have remote scan task and local scan task at the same time.
122-
// So we need to compute the _scanner_scheduler in each scan operator instead of query context.
123-
if (scanner->_scanner->get_storage_type() == TabletStorageType::STORAGE_TYPE_LOCAL) {
124-
_scanner_scheduler = _state->get_query_ctx()->get_scan_scheduler();
125-
} else {
126-
_scanner_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler();
127-
}
128-
if (auto* task_executor_scheduler =
129-
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
122+
if (const auto* task_executor_scheduler =
123+
dynamic_cast<const TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
130124
std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
131125
vectorized::TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id));
132126
_task_handle = DORIS_TRY(task_executor->create_task(
@@ -143,11 +137,6 @@ Status ScannerContext::init() {
143137
// Provide more memory for wide tables, increase proportionally by multiples of 300
144138
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
145139

146-
if (_min_scan_concurrency_of_scan_scheduler == 0) {
147-
// _scanner_scheduler->get_max_threads() is setted by workload group.
148-
_min_scan_concurrency_of_scan_scheduler = 2 * _scanner_scheduler->get_max_threads();
149-
}
150-
151140
if (_all_scanners.empty()) {
152141
_is_finished = true;
153142
_set_scanner_done();
@@ -175,15 +164,6 @@ Status ScannerContext::init() {
175164
}
176165
}
177166

178-
// For select * from table limit 10; should just use one thread.
179-
if (_local_state->should_run_serial()) {
180-
_max_scan_concurrency = 1;
181-
_min_scan_concurrency = 1;
182-
}
183-
184-
// Avoid corner case.
185-
_min_scan_concurrency = std::min(_min_scan_concurrency, _max_scan_concurrency);
186-
187167
COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
188168
COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
189169

be/src/vec/exec/scan/scanner_context.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
169169

170170
RuntimeState* state() { return _state; }
171171

172-
SimplifiedScanScheduler* get_scan_scheduler() { return _scanner_scheduler; }
172+
SimplifiedScanScheduler* get_scan_scheduler() const { return _scanner_scheduler; }
173173

174174
void stop_scanners(RuntimeState* state);
175175

@@ -234,7 +234,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
234234

235235
int64_t _max_bytes_in_queue = 0;
236236
doris::vectorized::ScannerScheduler* _scanner_scheduler_global = nullptr;
237-
SimplifiedScanScheduler* _scanner_scheduler = nullptr;
238237
// Using stack so that we can resubmit scanner in a LIFO order, maybe more cache friendly
239238
std::stack<std::shared_ptr<ScanTask>> _pending_scanners;
240239
// Scanner that is submitted to the scheduler.
@@ -256,13 +255,14 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
256255

257256
// adaptive scan concurrency related
258257

259-
int32_t _min_scan_concurrency_of_scan_scheduler = 0;
260-
int32_t _min_scan_concurrency = 1;
258+
SimplifiedScanScheduler* _scanner_scheduler = nullptr;
259+
const int32_t _min_scan_concurrency_of_scan_scheduler = 0;
261260
// The overall target of our system is to make full utilization of the resources.
262261
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
263262
// Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
264263
// So that for a single query, we can make sure it could make full utilization of the resource.
265264
int32_t _max_scan_concurrency = 0;
265+
const int32_t _min_scan_concurrency = 1;
266266

267267
std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask> current_scan_task,
268268
int32_t current_concurrency);

be/src/vec/exec/scan/scanner_scheduler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
9090
scanner_delegate->_scanner->start_wait_worker_timer();
9191
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
9292
auto sumbit_task = [&]() {
93-
SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
93+
auto* scan_sched = ctx->get_scan_scheduler();
9494
auto work_func = [scanner_ref = scan_task, ctx]() {
9595
auto status = [&] {
9696
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));

0 commit comments

Comments
 (0)