Skip to content

Commit 049b532

Browse files
committed
[feat](seq mapping) uniq table supports multi-stream merging through sequence mapping
1 parent e999329 commit 049b532

34 files changed

+887
-30
lines changed

be/src/olap/compaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ Status Compaction::merge_input_rowsets() {
202202
{
203203
SCOPED_TIMER(_merge_rowsets_latency_timer);
204204
// 1. Merge segment files and write bkd inverted index
205-
if (_is_vertical) {
205+
if (_is_vertical && !_tablet->tablet_schema()->has_seq_map()) {
206206
if (!_tablet->tablet_schema()->cluster_key_uids().empty()) {
207207
RETURN_IF_ERROR(update_delete_bitmap());
208208
}

be/src/olap/memtable.cpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,35 @@ Status MemTable::insert(const vectorized::Block* input_block,
244244
return Status::OK();
245245
}
246246

247+
void MemTable::_aggregate_two_row_with_sequence_map(vectorized::MutableBlock& mutable_block,
248+
RowInBlock* src_row, RowInBlock* dst_row) {
249+
const auto& seq_map = _tablet_schema->seq_map();
250+
for (const auto& it : seq_map) {
251+
auto sequence = it.first;
252+
auto* sequence_col_ptr = mutable_block.mutable_columns()[sequence].get();
253+
auto res = sequence_col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos,
254+
*sequence_col_ptr, -1);
255+
if (res > 0) {
256+
continue;
257+
}
258+
for (auto cid : it.second) {
259+
if (cid < _num_columns) {
260+
auto* col_ptr = mutable_block.mutable_columns()[cid].get();
261+
_agg_functions[cid]->add(dst_row->agg_places(cid),
262+
const_cast<const doris::vectorized::IColumn**>(&col_ptr),
263+
src_row->_row_pos, _arena);
264+
}
265+
}
266+
if (sequence < _num_columns) {
267+
_agg_functions[sequence]->add(
268+
dst_row->agg_places(sequence),
269+
const_cast<const doris::vectorized::IColumn**>(&sequence_col_ptr),
270+
src_row->_row_pos, _arena);
271+
sequence_col_ptr->replace_column_data(*sequence_col_ptr, src_row->_row_pos, dst_row->_row_pos);
272+
}
273+
}
274+
}
275+
247276
template <bool has_skip_bitmap_col>
248277
void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
249278
RowInBlock* src_row, RowInBlock* dst_row) {
@@ -504,7 +533,11 @@ void MemTable::_aggregate() {
504533
_init_row_for_agg(prev_row, mutable_block);
505534
}
506535
_stat.merged_rows++;
507-
_aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row);
536+
if (_tablet_schema->has_seq_map()) {
537+
_aggregate_two_row_with_sequence_map(mutable_block, cur_row, prev_row);
538+
} else {
539+
_aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row);
540+
}
508541
} else {
509542
prev_row = cur_row;
510543
if (!temp_row_in_blocks.empty()) {

be/src/olap/memtable.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ class MemTable {
210210
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
211211
RowInBlock* row_in_skiplist);
212212

213+
void _aggregate_two_row_with_sequence_map(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
214+
RowInBlock* row_in_skiplist);
215+
213216
// Used to wrapped by to_block to do exception handle logic
214217
Status _to_block(std::unique_ptr<vectorized::Block>* res);
215218

be/src/olap/rowset/beta_rowset_reader.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
176176
}
177177

178178
if (_should_push_down_value_predicates()) {
179-
if (_read_context->value_predicates != nullptr) {
179+
if (_read_context->value_predicates != nullptr &&
180+
!read_context->tablet_schema->has_seq_map()) {
180181
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
181182
_read_context->value_predicates->begin(),
182183
_read_context->value_predicates->end());

be/src/olap/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class Schema {
7878
columns.push_back(std::make_shared<TabletColumn>(column));
7979
}
8080
_delete_sign_idx = tablet_schema->delete_sign_idx();
81-
if (tablet_schema->has_sequence_col()) {
81+
if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
8282
_has_sequence_col = true;
8383
}
8484
_init(columns, col_ids, num_key_columns);

be/src/olap/tablet_meta.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,16 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
164164
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
165165
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
166166
schema->set_sequence_col_idx(tablet_schema.sequence_col_idx);
167+
auto p_seq_map = schema->mutable_seq_map(); // ColumnGroupsPB
168+
169+
for (auto& it : tablet_schema.seq_map) { // std::vector< ::doris::TColumnGroup>
170+
uint32_t key = it.sequence_column;
171+
ColumnGroupPB* cg_pb = p_seq_map->add_cg(); // ColumnGroupPB {key: {v1, v2, v3}}
172+
cg_pb->set_sequence_column(key);
173+
for (auto v : it.columns_in_group) {
174+
cg_pb->add_columns_in_group(v);
175+
}
176+
}
167177
switch (tablet_schema.keys_type) {
168178
case TKeysType::DUP_KEYS:
169179
schema->set_keys_type(KeysType::DUP_KEYS);

be/src/olap/tablet_reader.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,47 @@ Status TabletReader::_init_params(const ReaderParams& read_params) {
336336
}
337337
}
338338

339+
if (_tablet_schema->has_seq_map()) {
340+
if (_sequence_col_idx != -1) {
341+
auto msg = "sequence columns conflict, both seq_col and seq_map are true!";
342+
LOG(WARNING) << msg;
343+
return Status::InternalError(msg);
344+
}
345+
_val_to_seq = _tablet_schema->value_to_seq();
346+
if (_val_to_seq.size() <= 0) {
347+
auto msg = "seq_map is empty!";
348+
LOG(WARNING) << msg;
349+
return Status::InternalError(msg);
350+
}
351+
}
352+
353+
/*
354+
* |** KEY **| ** VALUE ** |
355+
------------------------------------
356+
|** KEY **| CDE is value| sequence|
357+
|----|----|----|----|----|----|----|
358+
A B C D E S1 S2
359+
0 1 2 3 4 5 6
360+
361+
for example SQL is select C,C,D,E from table;
362+
_value_cids is {2,2,3,4}
363+
_seq_map is {5:{2,3}, 6:{4}}
364+
_value_to_seq is {2:5,3:5,5:5,4:6,6:6}
365+
_return_seq_map is {5:{2,2,3}, 6:{4}}
366+
*/
367+
_return_seq_map.clear();
368+
for (const auto &val : _value_cids) {
369+
auto seq = _val_to_seq[val];
370+
371+
if (_return_seq_map.find(seq) == _return_seq_map.end()) {
372+
_return_seq_map[seq] = std::vector<uint32_t>();
373+
}
374+
375+
if (val != seq) {
376+
_return_seq_map[seq].push_back(val);
377+
}
378+
}
379+
339380
return res;
340381
}
341382

be/src/olap/tablet_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,9 @@ class TabletReader {
318318
std::vector<uint32_t> _key_cids;
319319
std::vector<uint32_t> _value_cids;
320320

321+
std::unordered_map<uint32_t /* seq cid */, std::vector<uint32_t> /* value cids */> _return_seq_map;
322+
std::unordered_map<uint32_t /* cid */, uint32_t /* sequence */> _val_to_seq;
323+
321324
uint64_t _merged_rows = 0;
322325
OlapReaderStatistics _stats;
323326
};

be/src/olap/tablet_schema.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,6 +1193,41 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
11931193
_storage_page_size = schema.storage_page_size();
11941194
_storage_dict_page_size = schema.storage_dict_page_size();
11951195
_schema_version = schema.schema_version();
1196+
auto column_groups_pb = schema.seq_map();
1197+
_seq_map.clear();
1198+
_value_to_seq.clear();
1199+
/*
1200+
* ColumnGroupsPB is a list of cg_pb, and
1201+
* ColumnGroupsPB do not have begin() or end() method.
1202+
* we must use for(i=0;i<xx;i++) loop
1203+
*/
1204+
for (int i = 0; i < column_groups_pb.cg_size(); i++) {
1205+
ColumnGroupPB cg_pb = column_groups_pb.cg(i);
1206+
uint32_t key = cg_pb.sequence_column();
1207+
for (auto j : cg_pb.columns_in_group()) {
1208+
_seq_map[key].push_back(j);
1209+
}
1210+
}
1211+
1212+
if (_seq_map.size() > 0) {
1213+
/*
1214+
|** KEY **| ** VALUE ** |
1215+
------------------------------------
1216+
|** KEY **| CDE is value| sequence|
1217+
|----|----|----|----|----|----|----|
1218+
A B C D E S1 S2
1219+
0 1 2 3 4 5 6
1220+
for example: _seq_map is {5:{2,3}, 6:{4}}
1221+
then, _value_to_seq = {2:5,3:5,5:5,4:6,6:6}
1222+
*/
1223+
for (auto it = _seq_map.cbegin(); it != _seq_map.cend(); it++) {
1224+
auto k = it->first;
1225+
for (auto v : it->second) {
1226+
_value_to_seq[v] = k;
1227+
}
1228+
_value_to_seq[k] = k;
1229+
}
1230+
}
11961231
// Default to V1 inverted index storage format for backward compatibility if not specified in schema.
11971232
if (!schema.has_inverted_index_storage_format()) {
11981233
_inverted_index_storage_format = InvertedIndexStorageFormatPB::V1;
@@ -1456,6 +1491,15 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
14561491
tablet_schema_pb->mutable_row_store_column_unique_ids()->Assign(
14571492
_row_store_column_unique_ids.begin(), _row_store_column_unique_ids.end());
14581493
tablet_schema_pb->set_enable_variant_flatten_nested(_enable_variant_flatten_nested);
1494+
auto column_groups_pb = tablet_schema_pb->mutable_seq_map();
1495+
for (const auto& it : _seq_map) {
1496+
uint32_t key = it.first;
1497+
ColumnGroupPB* cg_pb = column_groups_pb->add_cg(); // ColumnGroupPB {key: {v1, v2, v3}}
1498+
cg_pb->set_sequence_column(key);
1499+
for (auto v : it.second) {
1500+
cg_pb->add_columns_in_group(v);
1501+
}
1502+
}
14591503
}
14601504

14611505
size_t TabletSchema::row_size() const {

be/src/olap/tablet_schema.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,9 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
637637
}
638638
return 0;
639639
}
640+
inline const std::unordered_map<uint32_t, std::vector<uint32_t>>& seq_map() const { return _seq_map; }
641+
inline bool has_seq_map() const { return _seq_map.size() > 0;}
642+
inline const std::unordered_map<uint32_t, uint32_t>& value_to_seq() const { return _value_to_seq; }
640643

641644
private:
642645
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
@@ -716,6 +719,8 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
716719
// value: indexes
717720
using PatternToIndex = std::unordered_map<std::string, std::vector<TabletIndexPtr>>;
718721
std::unordered_map<int32_t, PatternToIndex> _index_by_unique_id_with_pattern;
722+
std::unordered_map<uint32_t /* seq cid */, std::vector<uint32_t> /* value cids */> _seq_map;
723+
std::unordered_map<uint32_t /* cid */, uint32_t /* sequence */> _value_to_seq;
719724
};
720725

721726
bool operator==(const TabletSchema& a, const TabletSchema& b);

0 commit comments

Comments
 (0)