Skip to content

Commit 34e7e04

Browse files
committed
[feature](catalog) support varbinary type mapping in hive/iceberg/paimon table (apache#57821)
Problem Summary: support varbinary type in hive/iceberg/paimon table, could mapping varbinary type into doris directly, not of use string type, could use catalog properties enable.mapping.varbinary control it, and default is false. and TVF function, eg HDFS also have param could control, and default is false. 1. when parquet file column type is tparquet::Type::BYTE_ARRAY and no logicalType and converted_type,read it to column_varbianry directly. so both physical convert and logical convert are consistent. if tparquet::Type::BYTE_ARRAY and have set logicalType, eg String, so those will be reading as column_string, and if the table column create as binary column, so VarBinaryConverter used convert column_string to column_varbinary. 2. when orc file column is binary type, also mapping to varbinary type directly, and could reuse StringVectorBatch. 3. add cast between string and varbinary type. 4. mapping UUID to binary type instead of string in iceberg . 5. change the bool safe_cast_string(**const char\* startptr, size_t buffer_size**, xxx) signature to safe_cast_string(**const StringRef& str_ref**, xxx). 6. add **const** to read_date_text_impl function. 7. add some test with paimon catalog test varbinary, will add more case for hive/iceberg and update doc. ``` mysql> show create table binary_demo3; +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | binary_demo3 | CREATE TABLE `binary_demo3` ( `id` int NULL, `record_name` char(10) NULL, `vrecord_name` text NULL, `bin` varbinary(10) NULL, `varbin` varbinary(2147483647) NULL ) ENGINE=PAIMON_EXTERNAL_TABLE LOCATION 'file:/mnt/disk2/zhangsida/test_paimon/demo.db/binary_demo3' PROPERTIES ( "path" = "file:/mnt/disk2/zhangsida/test_paimon/demo.db/binary_demo3", "primary-key" = "id" ); | +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.00 sec) mysql> select *, length(record_name),length(vrecord_name),length(bin),length(varbin) from binary_demo3; +------+-------------+--------------+------------------------+----------------+---------------------+----------------------+-------------+----------------+ | id | record_name | vrecord_name | bin | varbin | length(record_name) | length(vrecord_name) | length(bin) | length(varbin) | +------+-------------+--------------+------------------------+----------------+---------------------+----------------------+-------------+----------------+ | 1 | AAAA | AAAA | 0xAAAA0000000000000000 | 0xAAAA | 10 | 4 | 10 | 2 | | 2 | 6161 | 6161 | 0x61610000000000000000 | 0x6161 | 10 | 4 | 10 | 2 | | 3 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | +------+-------------+--------------+------------------------+----------------+---------------------+----------------------+-------------+----------------+ ``` support varbinary type mapping in hive/iceberg/paimon table
1 parent 34be474 commit 34e7e04

File tree

94 files changed

+2274
-208
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+2274
-208
lines changed

be/src/util/arrow/row_batch.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ Status convert_to_arrow_type(const vectorized::DataTypePtr& origin_type,
161161
*result = arrow::binary();
162162
break;
163163
}
164+
case TYPE_VARBINARY: {
165+
*result = arrow::binary();
166+
break;
167+
}
164168
default:
165169
return Status::InvalidArgument("Unknown primitive type({}) convert to Arrow type",
166170
type->get_name());

be/src/vec/columns/column_varbinary.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <glog/logging.h>
2121

2222
#include <cstddef>
23+
#include <cstdint>
2324

2425
#include "runtime/primitive_type.h"
2526
#include "vec/columns/column.h"
@@ -162,5 +163,64 @@ void ColumnVarbinary::replace_column_data(const IColumn& rhs, size_t row, size_t
162163
_data[self_row] = doris::StringView(dst, val.size());
163164
}
164165

166+
size_t ColumnVarbinary::get_max_row_byte_size() const {
167+
uint32_t max_size = 0;
168+
size_t num_rows = size();
169+
for (size_t i = 0; i < num_rows; ++i) {
170+
max_size = std::max(max_size, _data[i].size());
171+
}
172+
return max_size + sizeof(uint32_t);
173+
}
174+
175+
void ColumnVarbinary::serialize(StringRef* keys, size_t num_rows) const {
176+
for (size_t i = 0; i < num_rows; ++i) {
177+
keys[i].size += serialize_impl(const_cast<char*>(keys[i].data + keys[i].size), i);
178+
}
179+
}
180+
181+
void ColumnVarbinary::deserialize(StringRef* keys, const size_t num_rows) {
182+
for (size_t i = 0; i != num_rows; ++i) {
183+
auto sz = deserialize_impl(keys[i].data);
184+
keys[i].data += sz;
185+
keys[i].size -= sz;
186+
}
187+
}
188+
189+
template <bool positive>
190+
struct ColumnVarbinary::less {
191+
const ColumnVarbinary& parent;
192+
explicit less(const ColumnVarbinary& parent_) : parent(parent_) {}
193+
bool operator()(size_t lhs, size_t rhs) const {
194+
int res = parent._data[lhs].compare(parent._data[rhs]);
195+
return positive ? (res < 0) : (res > 0);
196+
}
197+
};
198+
199+
void ColumnVarbinary::get_permutation(bool reverse, size_t limit, int /*nan_direction_hint*/,
200+
IColumn::Permutation& res) const {
201+
size_t s = _data.size();
202+
res.resize(s);
203+
for (size_t i = 0; i < s; ++i) {
204+
res[i] = i;
205+
}
206+
207+
if (reverse) {
208+
pdqsort(res.begin(), res.end(), less<false>(*this));
209+
} else {
210+
pdqsort(res.begin(), res.end(), less<true>(*this));
211+
}
212+
}
213+
214+
void ColumnVarbinary::insert_many_strings(const StringRef* strings, size_t num) {
215+
for (size_t i = 0; i < num; i++) {
216+
insert_data(strings[i].data, strings[i].size);
217+
}
218+
}
219+
220+
void ColumnVarbinary::insert_many_strings_overflow(const StringRef* strings, size_t num,
221+
size_t max_length) {
222+
insert_many_strings(strings, num);
223+
}
224+
165225
#include "common/compile_check_end.h"
166226
} // namespace doris::vectorized

be/src/vec/columns/column_varbinary.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#pragma once
1919

2020
#include <glog/logging.h>
21+
#include <pdqsort.h>
2122

2223
#include <cstddef>
2324

@@ -34,8 +35,11 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {
3435
private:
3536
using Self = ColumnVarbinary;
3637
friend class COWHelper<IColumn, ColumnVarbinary>;
38+
template <bool positive>
39+
struct less;
3740

3841
public:
42+
using value_type = typename PrimitiveTypeTraits<TYPE_VARBINARY>::ColumnItemType;
3943
using Container = PaddedPODArray<doris::StringView>;
4044
ColumnVarbinary() = default;
4145
ColumnVarbinary(const size_t n) : _data(n) {}
@@ -105,10 +109,19 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {
105109

106110
int compare_at(size_t n, size_t m, const IColumn& rhs_,
107111
int /*nan_direction_hint*/) const override {
108-
const ColumnVarbinary& rhs = assert_cast<const ColumnVarbinary&>(rhs_);
112+
const auto& rhs = assert_cast<const ColumnVarbinary&>(rhs_);
109113
return this->_data[n].compare(rhs.get_data()[m]);
110114
}
111115

116+
void get_permutation(bool reverse, size_t limit, int /*nan_direction_hint*/,
117+
IColumn::Permutation& res) const override;
118+
119+
size_t get_max_row_byte_size() const override;
120+
121+
void deserialize(StringRef* keys, const size_t num_rows) override;
122+
123+
void serialize(StringRef* keys, const size_t num_rows) const override;
124+
112125
void pop_back(size_t n) override { resize(size() - n); }
113126

114127
StringRef serialize_value_into_arena(size_t n, Arena& arena,
@@ -167,6 +180,11 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {
167180
return _data[row].size() + sizeof(uint32_t);
168181
}
169182

183+
void insert_many_strings(const StringRef* strings, size_t num) override;
184+
185+
void insert_many_strings_overflow(const StringRef* strings, size_t num,
186+
size_t max_length) override;
187+
170188
private:
171189
Container _data;
172190
Arena _arena;

be/src/vec/data_types/serde/data_type_varbinary_serde.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,71 @@ Status DataTypeVarbinarySerDe::_write_column_to_mysql(const IColumn& column,
6565
return Status::OK();
6666
}
6767

68+
Status DataTypeVarbinarySerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map,
69+
arrow::ArrayBuilder* array_builder,
70+
int64_t start, int64_t end,
71+
const cctz::time_zone& ctz) const {
72+
auto lambda_function = [&](auto& builder) -> Status {
73+
const auto& varbinary_column_data = assert_cast<const ColumnVarbinary&>(column).get_data();
74+
for (size_t i = start; i < end; ++i) {
75+
if (null_map && (*null_map)[i]) {
76+
RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), column.get_name(),
77+
builder.type()->name()));
78+
continue;
79+
}
80+
auto string_view = varbinary_column_data[i];
81+
RETURN_IF_ERROR(checkArrowStatus(builder.Append(string_view.data(), string_view.size()),
82+
column.get_name(), builder.type()->name()));
83+
}
84+
return Status::OK();
85+
};
86+
if (array_builder->type()->id() == arrow::Type::BINARY) {
87+
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
88+
return lambda_function(builder);
89+
} else if (array_builder->type()->id() == arrow::Type::STRING) {
90+
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
91+
return lambda_function(builder);
92+
} else {
93+
return Status::InvalidArgument("Unsupported arrow type for varbinary column: {}",
94+
array_builder->type()->name());
95+
}
96+
return Status::OK();
97+
}
98+
99+
Status DataTypeVarbinarySerDe::write_column_to_orc(const std::string& timezone,
100+
const IColumn& column, const NullMap* null_map,
101+
orc::ColumnVectorBatch* orc_col_batch,
102+
int64_t start, int64_t end,
103+
vectorized::Arena& arena) const {
104+
auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
105+
const auto& varbinary_column_data = assert_cast<const ColumnVarbinary&>(column).get_data();
106+
107+
for (auto row_id = start; row_id < end; row_id++) {
108+
cur_batch->data[row_id] = const_cast<char*>(varbinary_column_data[row_id].data());
109+
cur_batch->length[row_id] = varbinary_column_data[row_id].size();
110+
}
111+
112+
cur_batch->numElements = end - start;
113+
return Status::OK();
114+
}
115+
116+
Status DataTypeVarbinarySerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num,
117+
BufferWritable& bw,
118+
FormatOptions& options) const {
119+
auto result = check_column_const_set_readability(column, row_num);
120+
ColumnPtr ptr = result.first;
121+
row_num = result.second;
122+
const auto& value = assert_cast<const ColumnVarbinary&>(*ptr).get_data_at(row_num);
123+
bw.write(value.data, value.size);
124+
return Status::OK();
125+
}
126+
127+
Status DataTypeVarbinarySerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
128+
const FormatOptions& options) const {
129+
assert_cast<ColumnVarbinary&>(column).insert_data(slice.data, slice.size);
130+
return Status::OK();
131+
}
132+
68133
void DataTypeVarbinarySerDe::to_string(const IColumn& column, size_t row_num,
69134
BufferWritable& bw) const {
70135
const auto value = assert_cast<const ColumnVarbinary&>(column).get_data_at(row_num);

be/src/vec/data_types/serde/data_type_varbinary_serde.h

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323

2424
#include "common/status.h"
2525
#include "data_type_serde.h"
26+
#include "data_type_string_serde.h"
2627
#include "vec/columns/column.h"
2728
#include "vec/columns/column_const.h"
29+
#include "vec/columns/column_varbinary.h"
2830
#include "vec/common/arena.h"
2931
#include "vec/data_types/serde/data_type_nullable_serde.h"
3032

@@ -39,19 +41,15 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
3941
std::string get_name() const override { return "Varbinary"; }
4042

4143
Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw,
42-
FormatOptions& options) const override {
43-
return Status::NotSupported("serialize_one_cell_to_json with type " + column.get_name());
44-
}
44+
FormatOptions& options) const override;
4545

4646
Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx,
4747
BufferWritable& bw, FormatOptions& options) const override {
4848
return Status::NotSupported("serialize_column_to_json with type " + column.get_name());
4949
}
50+
5051
Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
51-
const FormatOptions& options) const override {
52-
return Status::NotSupported("deserialize_one_cell_from_text with type " +
53-
column.get_name());
54-
}
52+
const FormatOptions& options) const override;
5553

5654
Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices,
5755
uint64_t* num_deserialized,
@@ -64,6 +62,7 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
6462
int64_t end) const override {
6563
return Status::NotSupported("write_column_to_pb with type " + column.get_name());
6664
}
65+
6766
Status read_column_from_pb(IColumn& column, const PValues& arg) const override {
6867
return Status::NotSupported("read_column_from_pb with type " + column.get_name());
6968
}
@@ -75,9 +74,8 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
7574

7675
Status write_column_to_arrow(const IColumn& column, const NullMap* null_map,
7776
arrow::ArrayBuilder* array_builder, int64_t start, int64_t end,
78-
const cctz::time_zone& ctz) const override {
79-
return Status::NotSupported("write_column_to_arrow with type " + column.get_name());
80-
}
77+
const cctz::time_zone& ctz) const override;
78+
8179
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
8280
int64_t end, const cctz::time_zone& ctz) const override {
8381
return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
@@ -94,11 +92,7 @@ class DataTypeVarbinarySerDe : public DataTypeSerDe {
9492

9593
Status write_column_to_orc(const std::string& timezone, const IColumn& column,
9694
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
97-
int64_t start, int64_t end,
98-
vectorized::Arena& arena) const override {
99-
return Status::Error(ErrorCode::NOT_IMPLEMENTED_ERROR,
100-
"write_column_to_orc with type " + column.get_name());
101-
}
95+
int64_t start, int64_t end, vectorized::Arena& arena) const override;
10296

10397
void to_string(const IColumn& column, size_t row_num, BufferWritable& bw) const override;
10498

be/src/vec/exec/format/column_type_convert.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
#include "vec/exec/format/column_type_convert.h"
1919

2020
#include "common/cast_set.h"
21+
#include "runtime/define_primitive_type.h"
2122

2223
namespace doris::vectorized::converter {
2324
#include "common/compile_check_begin.h"
2425

25-
const std::set<std::string> SafeCastString<TYPE_BOOLEAN>::FALSE_VALUES = {"false", "off", "no", "0",
26-
""};
27-
2826
#define FOR_LOGICAL_INTEGER_TYPES(M) \
2927
M(TYPE_TINYINT) \
3028
M(TYPE_SMALLINT) \
@@ -225,6 +223,8 @@ static std::unique_ptr<ColumnTypeConverter> _to_string_converter(const DataTypeP
225223
default:
226224
return std::make_unique<UnsupportedConverter>(src_type, dst_type);
227225
}
226+
} else if (is_varbinary(src_primitive_type)) {
227+
return std::make_unique<VarBinaryConverter<TYPE_STRING, TYPE_STRING>>();
228228
}
229229
return std::make_unique<UnsupportedConverter>(src_type, dst_type);
230230
}
@@ -240,6 +240,9 @@ static std::unique_ptr<ColumnTypeConverter> _from_string_converter(const DataTyp
240240
remove_nullable(dst_type));
241241
FOR_ALL_LOGICAL_TYPES(DISPATCH)
242242
#undef DISPATCH
243+
case TYPE_VARBINARY: {
244+
return std::make_unique<VarBinaryConverter<TYPE_STRING, TYPE_VARBINARY>>();
245+
}
243246
default:
244247
return std::make_unique<UnsupportedConverter>(src_type, dst_type);
245248
}

0 commit comments

Comments
 (0)