@@ -35,8 +35,8 @@ using namespace vectorized;
3535
3636Status ParallelScannerBuilder::build_scanners (std::list<ScannerSPtr>& scanners) {
3737 RETURN_IF_ERROR (_load ());
38- if (_scan_parallelism_by_segment ) {
39- return _build_scanners_by_segment (scanners);
38+ if (_scan_parallelism_by_per_segment ) {
39+ return _build_scanners_by_per_segment (scanners);
4040 } else if (_is_dup_mow_key) {
4141 // Default strategy for DUP/MOW tables: split by rowids within segments
4242 return _build_scanners_by_rowid (scanners);
@@ -170,7 +170,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
170170// This guarantees the number of scanners equals the number of segments across all rowsets
171171// for the involved tablets. It preserves delete predicates and key ranges, and clones
172172// RowsetReader per scanner to avoid sharing between scanners.
173- Status ParallelScannerBuilder::_build_scanners_by_segment (std::list<ScannerSPtr>& scanners) {
173+ Status ParallelScannerBuilder::_build_scanners_by_per_segment (std::list<ScannerSPtr>& scanners) {
174174 DCHECK_GE (_rows_per_scanner, _min_rows_per_scanner);
175175
176176 for (auto && [tablet, version] : _tablets) {
@@ -182,79 +182,34 @@ Status ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
182182 ExecEnv::GetInstance ()->storage_engine ().to_cloud ().tablet_hotspot ().count (*tablet);
183183 }
184184
185- // Collect segments into scanners based on rows count instead of one scanner per segment
186- TabletReadSource partitial_read_source;
187- int64_t rows_collected = 0 ;
188-
185+ // For each RowSet split in the read source, split by segment id and build
186+ // one scanner per segment. Keep delete predicates shared.
189187 for (auto & rs_split : entire_read_source.rs_splits ) {
190188 auto reader = rs_split.rs_reader ;
191189 auto rowset = reader->rowset ();
192190 const auto rowset_id = rowset->rowset_id ();
193-
194191 const auto & segments_rows = _all_segments_rows[rowset_id];
195192 if (segments_rows.empty () || rowset->num_rows () == 0 ) {
196193 continue ;
197194 }
198195
199- int64_t segment_start = 0 ;
200- auto split = RowSetSplits (reader->clone ());
201-
202- for (size_t i = 0 ; i < segments_rows.size (); ++i) {
203- const size_t rows_of_segment = segments_rows[i];
196+ // Build scanners for [i, i+1) segment range, without row-range slicing.
197+ for (int64_t i = 0 ; i < rowset->num_segments (); ++i) {
198+ RowSetSplits split (reader->clone ());
199+ split.segment_offsets .first = i;
200+ split.segment_offsets .second = i + 1 ;
201+ // No row-ranges slicing; scan whole segment i.
202+ DCHECK_GE (split.segment_offsets .second , split.segment_offsets .first + 1 );
204203
205- // Check if adding this segment would exceed rows_per_scanner
206- // 0.9: try to avoid splitting the segments into excessively small parts.
207- if (rows_collected > 0 && (rows_collected + rows_of_segment > _rows_per_scanner &&
208- rows_collected < _rows_per_scanner * 9 / 10 )) {
209- // Create a new scanner with collected segments
210- split.segment_offsets .first = segment_start;
211- split.segment_offsets .second =
212- i; // Range is [segment_start, i), including all segments from segment_start to i-1
213-
214- DCHECK_GT (split.segment_offsets .second , split.segment_offsets .first );
215-
216- partitial_read_source.rs_splits .emplace_back (std::move (split));
217-
218- scanners.emplace_back (_build_scanner (
219- tablet, version, _key_ranges,
220- {.rs_splits = std::move (partitial_read_source.rs_splits ),
221- .delete_predicates = entire_read_source.delete_predicates ,
222- .delete_bitmap = entire_read_source.delete_bitmap }));
223-
224- // Reset for next scanner
225- partitial_read_source = {};
226- split = RowSetSplits (reader->clone ());
227- segment_start = i;
228- rows_collected = 0 ;
229- }
230-
231- // Add current segment to the current scanner
232- rows_collected += rows_of_segment;
233- }
234-
235- // Add remaining segments in this rowset to a scanner
236- if (rows_collected > 0 ) {
237- split.segment_offsets .first = segment_start;
238- split.segment_offsets .second = segments_rows.size ();
239- DCHECK_GT (split.segment_offsets .second , split.segment_offsets .first );
204+ TabletReadSource partitial_read_source;
240205 partitial_read_source.rs_splits .emplace_back (std::move (split));
241- }
242- }
243206
244- // Add remaining segments across all rowsets to a scanner
245- if (rows_collected > 0 ) {
246- DCHECK_GT (partitial_read_source.rs_splits .size (), 0 );
247- #ifndef NDEBUG
248- for (auto & split : partitial_read_source.rs_splits ) {
249- DCHECK (split.rs_reader != nullptr );
250- DCHECK_LT (split.segment_offsets .first , split.segment_offsets .second );
207+ scanners.emplace_back (
208+ _build_scanner (tablet, version, _key_ranges,
209+ {.rs_splits = std::move (partitial_read_source.rs_splits ),
210+ .delete_predicates = entire_read_source.delete_predicates ,
211+ .delete_bitmap = entire_read_source.delete_bitmap }));
251212 }
252- #endif
253- scanners.emplace_back (
254- _build_scanner (tablet, version, _key_ranges,
255- {.rs_splits = std::move (partitial_read_source.rs_splits ),
256- .delete_predicates = entire_read_source.delete_predicates ,
257- .delete_bitmap = entire_read_source.delete_bitmap }));
258213 }
259214 }
260215
0 commit comments