Skip to content

Commit e11a065

Browse files
authored
branch-4.0: [feature](catalog) support varbinary type mapping in hive/iceberg/paimon table (#57821) (#58482)
### What problem does this PR solve? Problem Summary: cherry-pick from (#57821) ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent a6b5303 commit e11a065

File tree

95 files changed

+2281
-210
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+2281
-210
lines changed

be/src/util/arrow/row_batch.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ Status convert_to_arrow_type(const vectorized::DataTypePtr& origin_type,
161161
*result = arrow::binary();
162162
break;
163163
}
164+
case TYPE_VARBINARY: {
165+
*result = arrow::binary();
166+
break;
167+
}
164168
default:
165169
return Status::InvalidArgument("Unknown primitive type({}) convert to Arrow type",
166170
type->get_name());

be/src/vec/columns/column_varbinary.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <glog/logging.h>
2121

2222
#include <cstddef>
23+
#include <cstdint>
2324

2425
#include "runtime/primitive_type.h"
2526
#include "vec/columns/column.h"
@@ -162,5 +163,64 @@ void ColumnVarbinary::replace_column_data(const IColumn& rhs, size_t row, size_t
162163
_data[self_row] = doris::StringView(dst, val.size());
163164
}
164165

166+
size_t ColumnVarbinary::get_max_row_byte_size() const {
167+
uint32_t max_size = 0;
168+
size_t num_rows = size();
169+
for (size_t i = 0; i < num_rows; ++i) {
170+
max_size = std::max(max_size, _data[i].size());
171+
}
172+
return max_size + sizeof(uint32_t);
173+
}
174+
175+
void ColumnVarbinary::serialize_vec(StringRef* keys, size_t num_rows) const {
176+
for (size_t i = 0; i < num_rows; ++i) {
177+
keys[i].size += serialize_impl(const_cast<char*>(keys[i].data + keys[i].size), i);
178+
}
179+
}
180+
181+
void ColumnVarbinary::deserialize_vec(StringRef* keys, const size_t num_rows) {
182+
for (size_t i = 0; i != num_rows; ++i) {
183+
auto sz = deserialize_impl(keys[i].data);
184+
keys[i].data += sz;
185+
keys[i].size -= sz;
186+
}
187+
}
188+
189+
template <bool positive>
190+
struct ColumnVarbinary::less {
191+
const ColumnVarbinary& parent;
192+
explicit less(const ColumnVarbinary& parent_) : parent(parent_) {}
193+
bool operator()(size_t lhs, size_t rhs) const {
194+
int res = parent._data[lhs].compare(parent._data[rhs]);
195+
return positive ? (res < 0) : (res > 0);
196+
}
197+
};
198+
199+
void ColumnVarbinary::get_permutation(bool reverse, size_t limit, int /*nan_direction_hint*/,
200+
IColumn::Permutation& res) const {
201+
size_t s = _data.size();
202+
res.resize(s);
203+
for (size_t i = 0; i < s; ++i) {
204+
res[i] = i;
205+
}
206+
207+
if (reverse) {
208+
pdqsort(res.begin(), res.end(), less<false>(*this));
209+
} else {
210+
pdqsort(res.begin(), res.end(), less<true>(*this));
211+
}
212+
}
213+
214+
void ColumnVarbinary::insert_many_strings(const StringRef* strings, size_t num) {
215+
for (size_t i = 0; i < num; i++) {
216+
insert_data(strings[i].data, strings[i].size);
217+
}
218+
}
219+
220+
void ColumnVarbinary::insert_many_strings_overflow(const StringRef* strings, size_t num,
221+
size_t max_length) {
222+
insert_many_strings(strings, num);
223+
}
224+
165225
#include "common/compile_check_end.h"
166226
} // namespace doris::vectorized

be/src/vec/columns/column_varbinary.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#pragma once
1919

2020
#include <glog/logging.h>
21+
#include <pdqsort.h>
2122

2223
#include <cstddef>
2324

@@ -34,8 +35,11 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {
3435
private:
3536
using Self = ColumnVarbinary;
3637
friend class COWHelper<IColumn, ColumnVarbinary>;
38+
template <bool positive>
39+
struct less;
3740

3841
public:
42+
using value_type = typename PrimitiveTypeTraits<TYPE_VARBINARY>::ColumnItemType;
3943
using Container = PaddedPODArray<doris::StringView>;
4044
ColumnVarbinary() = default;
4145
ColumnVarbinary(const size_t n) : _data(n) {}
@@ -105,10 +109,19 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {
105109

106110
int compare_at(size_t n, size_t m, const IColumn& rhs_,
107111
int /*nan_direction_hint*/) const override {
108-
const ColumnVarbinary& rhs = assert_cast<const ColumnVarbinary&>(rhs_);
112+
const auto& rhs = assert_cast<const ColumnVarbinary&>(rhs_);
109113
return this->_data[n].compare(rhs.get_data()[m]);
110114
}
111115

116+
void get_permutation(bool reverse, size_t limit, int /*nan_direction_hint*/,
117+
IColumn::Permutation& res) const override;
118+
119+
size_t get_max_row_byte_size() const override;
120+
121+
void deserialize_vec(StringRef* keys, const size_t num_rows) override;
122+
123+
void serialize_vec(StringRef* keys, const size_t num_rows) const override;
124+
112125
void pop_back(size_t n) override { resize(size() - n); }
113126

114127
StringRef serialize_value_into_arena(size_t n, Arena& arena,
@@ -167,6 +180,11 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {
167180
return _data[row].size() + sizeof(uint32_t);
168181
}
169182

183+
void insert_many_strings(const StringRef* strings, size_t num) override;
184+
185+
void insert_many_strings_overflow(const StringRef* strings, size_t num,
186+
size_t max_length) override;
187+
170188
private:
171189
Container _data;
172190
Arena _arena;

be/src/vec/data_types/serde/data_type_varbinary_serde.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,71 @@ Status DataTypeVarbinarySerDe::_write_column_to_mysql(const IColumn& column,
6565
return Status::OK();
6666
}
6767

68+
Status DataTypeVarbinarySerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map,
69+
arrow::ArrayBuilder* array_builder,
70+
int64_t start, int64_t end,
71+
const cctz::time_zone& ctz) const {
72+
auto lambda_function = [&](auto& builder) -> Status {
73+
const auto& varbinary_column_data = assert_cast<const ColumnVarbinary&>(column).get_data();
74+
for (size_t i = start; i < end; ++i) {
75+
if (null_map && (*null_map)[i]) {
76+
RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), column.get_name(),
77+
builder.type()->name()));
78+
continue;
79+
}
80+
auto string_view = varbinary_column_data[i];
81+
RETURN_IF_ERROR(checkArrowStatus(builder.Append(string_view.data(), string_view.size()),
82+
column.get_name(), builder.type()->name()));
83+
}
84+
return Status::OK();
85+
};
86+
if (array_builder->type()->id() == arrow::Type::BINARY) {
87+
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
88+
return lambda_function(builder);
89+
} else if (array_builder->type()->id() == arrow::Type::STRING) {
90+
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
91+
return lambda_function(builder);
92+
} else {
93+
return Status::InvalidArgument("Unsupported arrow type for varbinary column: {}",
94+
array_builder->type()->name());
95+
}
96+
return Status::OK();
97+
}
98+
99+
Status DataTypeVarbinarySerDe::write_column_to_orc(const std::string& timezone,
100+
const IColumn& column, const NullMap* null_map,
101+
orc::ColumnVectorBatch* orc_col_batch,
102+
int64_t start, int64_t end,
103+
vectorized::Arena& arena) const {
104+
auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
105+
const auto& varbinary_column_data = assert_cast<const ColumnVarbinary&>(column).get_data();
106+
107+
for (auto row_id = start; row_id < end; row_id++) {
108+
cur_batch->data[row_id] = const_cast<char*>(varbinary_column_data[row_id].data());
109+
cur_batch->length[row_id] = varbinary_column_data[row_id].size();
110+
}
111+
112+
cur_batch->numElements = end - start;
113+
return Status::OK();
114+
}
115+
116+
Status DataTypeVarbinarySerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num,
117+
BufferWritable& bw,
118+
FormatOptions& options) const {
119+
auto result = check_column_const_set_readability(column, row_num);
120+
ColumnPtr ptr = result.first;
121+
row_num = result.second;
122+
const auto& value = assert_cast<const ColumnVarbinary&>(*ptr).get_data_at(row_num);
123+
bw.write(value.data, value.size);
124+
return Status::OK();
125+
}
126+
127+
Status DataTypeVarbinarySerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
128+
const FormatOptions& options) const {
129+
assert_cast<ColumnVarbinary&>(column).insert_data(slice.data, slice.size);
130+
return Status::OK();
131+
}
132+
68133
void DataTypeVarbinarySerDe::to_string(const IColumn& column, size_t row_num,
69134
BufferWritable& bw) const {
70135
const auto value = assert_cast<const ColumnVarbinary&>(column).get_data_at(row_num);

be/src/vec/data_types/serde/data_type_varbinary_serde.h

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323

2424
#include "common/status.h"
2525
#include "data_type_serde.h"
26+
#include "data_type_string_serde.h"
2627
#include "vec/columns/column.h"
2728
#include "vec/columns/column_const.h"
29+
#include "vec/columns/column_varbinary.h"
2830
#include "vec/common/arena.h"
2931
#include "vec/data_types/serde/data_type_nullable_serde.h"
3032

@@ -39,19 +41,15 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
3941
std::string get_name() const override { return "Varbinary"; }
4042

4143
Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw,
42-
FormatOptions& options) const override {
43-
return Status::NotSupported("serialize_one_cell_to_json with type " + column.get_name());
44-
}
44+
FormatOptions& options) const override;
4545

4646
Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx,
4747
BufferWritable& bw, FormatOptions& options) const override {
4848
return Status::NotSupported("serialize_column_to_json with type " + column.get_name());
4949
}
50+
5051
Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
51-
const FormatOptions& options) const override {
52-
return Status::NotSupported("deserialize_one_cell_from_text with type " +
53-
column.get_name());
54-
}
52+
const FormatOptions& options) const override;
5553

5654
Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices,
5755
uint64_t* num_deserialized,
@@ -64,6 +62,7 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
6462
int64_t end) const override {
6563
return Status::NotSupported("write_column_to_pb with type " + column.get_name());
6664
}
65+
6766
Status read_column_from_pb(IColumn& column, const PValues& arg) const override {
6867
return Status::NotSupported("read_column_from_pb with type " + column.get_name());
6968
}
@@ -75,9 +74,8 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
7574

7675
Status write_column_to_arrow(const IColumn& column, const NullMap* null_map,
7776
arrow::ArrayBuilder* array_builder, int64_t start, int64_t end,
78-
const cctz::time_zone& ctz) const override {
79-
return Status::NotSupported("write_column_to_arrow with type " + column.get_name());
80-
}
77+
const cctz::time_zone& ctz) const override;
78+
8179
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
8280
int64_t end, const cctz::time_zone& ctz) const override {
8381
return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
@@ -94,11 +92,7 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
9492

9593
Status write_column_to_orc(const std::string& timezone, const IColumn& column,
9694
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
97-
int64_t start, int64_t end,
98-
vectorized::Arena& arena) const override {
99-
return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
100-
"write_column_to_orc with type " + column.get_name());
101-
}
95+
int64_t start, int64_t end, vectorized::Arena& arena) const override;
10296

10397
void to_string(const IColumn& column, size_t row_num, BufferWritable& bw) const override;
10498

be/src/vec/exec/format/column_type_convert.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
#include "vec/exec/format/column_type_convert.h"
1919

2020
#include "common/cast_set.h"
21+
#include "runtime/define_primitive_type.h"
2122

2223
namespace doris::vectorized::converter {
2324
#include "common/compile_check_begin.h"
2425

25-
const std::set<std::string> SafeCastString<TYPE_BOOLEAN>::FALSE_VALUES = {"false", "off", "no", "0",
26-
""};
27-
2826
#define FOR_LOGICAL_INTEGER_TYPES(M) \
2927
M(TYPE_TINYINT) \
3028
M(TYPE_SMALLINT) \
@@ -225,6 +223,8 @@ static std::unique_ptr<ColumnTypeConverter> _to_string_converter(const DataTypeP
225223
default:
226224
return std::make_unique<UnsupportedConverter>(src_type, dst_type);
227225
}
226+
} else if (is_varbinary(src_primitive_type)) {
227+
return std::make_unique<VarBinaryConverter<TYPE_STRING, TYPE_STRING>>();
228228
}
229229
return std::make_unique<UnsupportedConverter>(src_type, dst_type);
230230
}
@@ -240,6 +240,9 @@ static std::unique_ptr<ColumnTypeConverter> _from_string_converter(const DataTyp
240240
remove_nullable(dst_type));
241241
FOR_ALL_LOGICAL_TYPES(DISPATCH)
242242
#undef DISPATCH
243+
case TYPE_VARBINARY: {
244+
return std::make_unique<VarBinaryConverter<TYPE_STRING, TYPE_VARBINARY>>();
245+
}
243246
default:
244247
return std::make_unique<UnsupportedConverter>(src_type, dst_type);
245248
}

0 commit comments

Comments
 (0)