Skip to content

Commit 9b45113

Browse files
committed
[feat](seq mapping) uniq table supports multi-stream merging through sequence mapping
1 parent 5b06a3f commit 9b45113

37 files changed

+915
-37
lines changed

be/src/olap/compaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ Status Compaction::merge_input_rowsets() {
204204
{
205205
SCOPED_TIMER(_merge_rowsets_latency_timer);
206206
// 1. Merge segment files and write bkd inverted index
207-
if (_is_vertical) {
207+
if (_is_vertical && !_tablet->tablet_schema()->has_seq_map()) {
208208
if (!_tablet->tablet_schema()->cluster_key_uids().empty()) {
209209
RETURN_IF_ERROR(update_delete_bitmap());
210210
}

be/src/olap/memtable.cpp

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,36 @@ Status MemTable::insert(const vectorized::Block* input_block,
244244
return Status::OK();
245245
}
246246

247+
void MemTable::_aggregate_two_row_with_sequence_map(vectorized::MutableBlock& mutable_block,
248+
RowInBlock* src_row, RowInBlock* dst_row) {
249+
const auto& seq_map = _tablet_schema->seq_map();
250+
for (const auto& it : seq_map) {
251+
auto sequence = it.first;
252+
auto* sequence_col_ptr = mutable_block.mutable_columns()[sequence].get();
253+
auto res = sequence_col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos,
254+
*sequence_col_ptr, -1);
255+
if (res > 0) {
256+
continue;
257+
}
258+
for (auto cid : it.second) {
259+
if (cid < _num_columns) {
260+
auto* col_ptr = mutable_block.mutable_columns()[cid].get();
261+
_agg_functions[cid]->add(dst_row->agg_places(cid),
262+
const_cast<const doris::vectorized::IColumn**>(&col_ptr),
263+
src_row->_row_pos, _arena);
264+
}
265+
}
266+
if (sequence < _num_columns) {
267+
_agg_functions[sequence]->add(
268+
dst_row->agg_places(sequence),
269+
const_cast<const doris::vectorized::IColumn**>(&sequence_col_ptr),
270+
src_row->_row_pos, _arena);
271+
sequence_col_ptr->replace_column_data(*sequence_col_ptr, src_row->_row_pos,
272+
dst_row->_row_pos);
273+
}
274+
}
275+
}
276+
247277
template <bool has_skip_bitmap_col>
248278
void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
249279
RowInBlock* src_row, RowInBlock* dst_row) {
@@ -504,7 +534,12 @@ void MemTable::_aggregate() {
504534
_init_row_for_agg(prev_row, mutable_block);
505535
}
506536
_stat.merged_rows++;
507-
_aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row);
537+
if (_tablet_schema->has_seq_map()) {
538+
_aggregate_two_row_with_sequence_map(mutable_block, cur_row, prev_row);
539+
} else {
540+
_aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row,
541+
prev_row);
542+
}
508543
} else {
509544
prev_row = cur_row;
510545
if (!temp_row_in_blocks.empty()) {

be/src/olap/memtable.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ class MemTable {
210210
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
211211
RowInBlock* row_in_skiplist);
212212

213+
void _aggregate_two_row_with_sequence_map(vectorized::MutableBlock& mutable_block,
214+
RowInBlock* new_row, RowInBlock* row_in_skiplist);
215+
213216
// Used to wrapped by to_block to do exception handle logic
214217
Status _to_block(std::unique_ptr<vectorized::Block>* res);
215218

be/src/olap/rowset/beta_rowset_reader.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
178178
}
179179

180180
if (_should_push_down_value_predicates()) {
181-
if (_read_context->value_predicates != nullptr) {
181+
if (_read_context->value_predicates != nullptr &&
182+
!read_context->tablet_schema->has_seq_map()) {
182183
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
183184
_read_context->value_predicates->begin(),
184185
_read_context->value_predicates->end());

be/src/olap/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class Schema {
8080
columns.push_back(std::make_shared<TabletColumn>(column));
8181
}
8282
_delete_sign_idx = tablet_schema->delete_sign_idx();
83-
if (tablet_schema->has_sequence_col()) {
83+
if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
8484
_has_sequence_col = true;
8585
}
8686
_init(columns, col_ids, num_key_columns);

be/src/olap/tablet_meta.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
185185
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
186186
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
187187
schema->set_sequence_col_idx(tablet_schema.sequence_col_idx);
188+
auto p_seq_map = schema->mutable_seq_map(); // ColumnGroupsPB
189+
190+
for (auto& it : tablet_schema.seq_map) { // std::vector< ::doris::TColumnGroup>
191+
uint32_t key = it.sequence_column;
192+
ColumnGroupPB* cg_pb = p_seq_map->add_cg(); // ColumnGroupPB {key: {v1, v2, v3}}
193+
cg_pb->set_sequence_column(key);
194+
for (auto v : it.columns_in_group) {
195+
cg_pb->add_columns_in_group(v);
196+
}
197+
}
188198
switch (tablet_schema.keys_type) {
189199
case TKeysType::DUP_KEYS:
190200
schema->set_keys_type(KeysType::DUP_KEYS);

be/src/olap/tablet_reader.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,47 @@ Status TabletReader::_init_params(const ReaderParams& read_params) {
337337
}
338338
}
339339

340+
if (_tablet_schema->has_seq_map()) {
341+
if (_sequence_col_idx != -1) {
342+
auto msg = "sequence columns conflict, both seq_col and seq_map are true!";
343+
LOG(WARNING) << msg;
344+
return Status::InternalError(msg);
345+
}
346+
_val_to_seq = _tablet_schema->value_to_seq();
347+
if (_val_to_seq.size() <= 0) {
348+
auto msg = "seq_map is empty!";
349+
LOG(WARNING) << msg;
350+
return Status::InternalError(msg);
351+
}
352+
}
353+
354+
/*
355+
* |** KEY **| ** VALUE ** |
356+
------------------------------------
357+
|** KEY **| CDE is value| sequence|
358+
|----|----|----|----|----|----|----|
359+
A B C D E S1 S2
360+
0 1 2 3 4 5 6
361+
362+
for example SQL is select C,C,D,E from table;
363+
_value_cids is {2,2,3,4}
364+
_seq_map is {5:{2,3}, 6:{4}}
365+
_value_to_seq is {2:5,3:5,5:5,4:6,6:6}
366+
_return_seq_map is {5:{2,2,3}, 6:{4}}
367+
*/
368+
_return_seq_map.clear();
369+
for (const auto& val : _value_cids) {
370+
auto seq = _val_to_seq[val];
371+
372+
if (_return_seq_map.find(seq) == _return_seq_map.end()) {
373+
_return_seq_map[seq] = std::vector<uint32_t>();
374+
}
375+
376+
if (val != seq) {
377+
_return_seq_map[seq].push_back(val);
378+
}
379+
}
380+
340381
return res;
341382
}
342383

be/src/olap/tablet_reader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,10 @@ class TabletReader {
319319
std::vector<uint32_t> _key_cids;
320320
std::vector<uint32_t> _value_cids;
321321

322+
std::unordered_map<uint32_t /* seq cid */, std::vector<uint32_t> /* value cids */>
323+
_return_seq_map;
324+
std::unordered_map<uint32_t /* cid */, uint32_t /* sequence */> _val_to_seq;
325+
322326
uint64_t _merged_rows = 0;
323327
OlapReaderStatistics _stats;
324328
};

be/src/olap/tablet_schema.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,41 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
11861186
_storage_page_size = schema.storage_page_size();
11871187
_storage_dict_page_size = schema.storage_dict_page_size();
11881188
_schema_version = schema.schema_version();
1189+
auto column_groups_pb = schema.seq_map();
1190+
_seq_map.clear();
1191+
_value_to_seq.clear();
1192+
/*
1193+
* ColumnGroupsPB is a list of cg_pb, and
1194+
* ColumnGroupsPB do not have begin() or end() method.
1195+
* we must use for(i=0;i<xx;i++) loop
1196+
*/
1197+
for (int i = 0; i < column_groups_pb.cg_size(); i++) {
1198+
ColumnGroupPB cg_pb = column_groups_pb.cg(i);
1199+
uint32_t key = cg_pb.sequence_column();
1200+
for (auto j : cg_pb.columns_in_group()) {
1201+
_seq_map[key].push_back(j);
1202+
}
1203+
}
1204+
1205+
if (_seq_map.size() > 0) {
1206+
/*
1207+
|** KEY **| ** VALUE ** |
1208+
------------------------------------
1209+
|** KEY **| CDE is value| sequence|
1210+
|----|----|----|----|----|----|----|
1211+
A B C D E S1 S2
1212+
0 1 2 3 4 5 6
1213+
for example: _seq_map is {5:{2,3}, 6:{4}}
1214+
then, _value_to_seq = {2:5,3:5,5:5,4:6,6:6}
1215+
*/
1216+
for (auto it = _seq_map.cbegin(); it != _seq_map.cend(); it++) {
1217+
auto k = it->first;
1218+
for (auto v : it->second) {
1219+
_value_to_seq[v] = k;
1220+
}
1221+
_value_to_seq[k] = k;
1222+
}
1223+
}
11891224
// Default to V1 inverted index storage format for backward compatibility if not specified in schema.
11901225
if (!schema.has_inverted_index_storage_format()) {
11911226
_inverted_index_storage_format = InvertedIndexStorageFormatPB::V1;
@@ -1449,6 +1484,15 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
14491484
tablet_schema_pb->mutable_row_store_column_unique_ids()->Assign(
14501485
_row_store_column_unique_ids.begin(), _row_store_column_unique_ids.end());
14511486
tablet_schema_pb->set_enable_variant_flatten_nested(_enable_variant_flatten_nested);
1487+
auto column_groups_pb = tablet_schema_pb->mutable_seq_map();
1488+
for (const auto& it : _seq_map) {
1489+
uint32_t key = it.first;
1490+
ColumnGroupPB* cg_pb = column_groups_pb->add_cg(); // ColumnGroupPB {key: {v1, v2, v3}}
1491+
cg_pb->set_sequence_column(key);
1492+
for (auto v : it.second) {
1493+
cg_pb->add_columns_in_group(v);
1494+
}
1495+
}
14521496
}
14531497

14541498
size_t TabletSchema::row_size() const {

be/src/olap/tablet_schema.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,13 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
671671
}
672672
return 0;
673673
}
674+
inline const std::unordered_map<uint32_t, std::vector<uint32_t>>& seq_map() const {
675+
return _seq_map;
676+
}
677+
inline bool has_seq_map() const { return _seq_map.size() > 0; }
678+
inline const std::unordered_map<uint32_t, uint32_t>& value_to_seq() const {
679+
return _value_to_seq;
680+
}
674681

675682
private:
676683
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
@@ -750,6 +757,8 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
750757
// value: indexes
751758
using PatternToIndex = std::unordered_map<std::string, std::vector<TabletIndexPtr>>;
752759
std::unordered_map<int32_t, PatternToIndex> _index_by_unique_id_with_pattern;
760+
std::unordered_map<uint32_t /* seq cid */, std::vector<uint32_t> /* value cids */> _seq_map;
761+
std::unordered_map<uint32_t /* cid */, uint32_t /* sequence */> _value_to_seq;
753762
};
754763

755764
bool operator==(const TabletSchema& a, const TabletSchema& b);

0 commit comments

Comments
 (0)