Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_column_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_conv
// for FixedSizeBinary
physical_converter =
std::make_unique<FixedSizeBinaryConverter>(parquet_schema.type_length);
} else if (src_logical_primitive == TYPE_FLOAT &&
src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY &&
parquet_schema.logicalType.__isset.FLOAT16) {
physical_converter =
std::make_unique<Float16PhysicalConverter>(parquet_schema.type_length);
} else {
physical_converter = std::make_unique<ConsistentPhysicalConverter>();
}
Expand Down
84 changes: 84 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/parquet_types.h>

#include "common/cast_set.h"
#include "runtime/primitive_type.h"
#include "vec/columns/column_varbinary.h"
#include "vec/core/extended_types.h"
#include "vec/core/field.h"
Expand Down Expand Up @@ -354,6 +355,89 @@ class FixedSizeBinaryConverter : public PhysicalToLogicalConverter {
}
};

class Float16PhysicalConverter : public PhysicalToLogicalConverter {
private:
int _type_length;

public:
Float16PhysicalConverter(int type_length) : _type_length(type_length) {
DCHECK_EQ(_type_length, 2);
}

Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr from_col = remove_nullable(src_physical_col);
MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable();

const auto* src_data = assert_cast<const ColumnUInt8*>(from_col.get());
size_t length = src_data->size();
size_t num_values = length / _type_length;
auto* to_float_column = assert_cast<ColumnFloat32*>(to_col.get());
size_t start_idx = to_float_column->size();
to_float_column->resize(start_idx + num_values);
auto& to_float_column_data = to_float_column->get_data();
const uint8_t* ptr = src_data->get_data().data();
for (int i = 0; i < num_values; ++i) {
size_t offset = i * _type_length;
const uint8_t* data_ptr = ptr + offset;
uint16_t raw;
memcpy(&raw, data_ptr, sizeof(uint16_t));
float value = half_to_float(raw);
to_float_column_data[start_idx + i] = value;
}

return Status::OK();
}

float half_to_float(uint16_t h) {
// uint16_t h: half precision floating point
// bit 15: sign(1 bit)
// bits 14..10 : exponent(5 bits)
// bits 9..0 : mantissa(10 bits)

// sign bit placed to float32 bit31
uint32_t sign = (h & 0x8000U) << 16; // 0x8000 << 16 = 0x8000_0000
// exponent:(5 bits)
uint32_t exp = (h & 0x7C00U) >> 10; // 0x7C00 = 0111 1100 0000 (half exponent mask)
// mantissa(10 bits)
uint32_t mant = (h & 0x03FFU); // 10-bit fraction

// cases:Zero/Subnormal, Normal, Inf/NaN
if (exp == 0) {
// exp==0: Zero or Subnormal ----------
if (mant == 0) {
// ±0.0
// sign = either 0x00000000 or 0x80000000
return std::bit_cast<float>(sign);
} else {
// ---------- Subnormal ----------
// half subnormal:
// value = (-1)^sign * (mant / 2^10) * 2^(1 - bias)
// half bias = 15 → exponent = 1 - 15 = -14
float f = (static_cast<float>(mant) / 1024.0F) * std::powf(2.0F, -14.0F);
return sign ? -f : f;
}
} else if (exp == 0x1F) {
// exp==31: Inf or NaN ----------
// float32:
// exponent = 255 (0xFF)
// mantissa = mant << 13
uint32_t f = sign | 0x7F800000U | (mant << 13);
return std::bit_cast<float>(f);
} else {
// Normalized ----------
// float32 exponent:
// exp32 = exp16 - bias16 + bias32
// bias16 = 15
// bias32 = 127
//
// so: exp32 = exp + (127 - 15)
uint32_t f = sign | ((exp + (127 - 15)) << 23) // place to float32 exponent
| (mant << 13); // mantissa align to 23 bits
return std::bit_cast<float>(f);
}
}
};

class UUIDVarBinaryConverter : public PhysicalToLogicalConverter {
public:
UUIDVarBinaryConverter(int type_length) : _type_length(type_length) {}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
} else if (logicalType.__isset.UUID) {
ans.first =
DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable, -1, -1, 16);
} else if (logicalType.__isset.FLOAT16) {
ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
} else {
throw Exception(Status::InternalError("Not supported parquet logicalType"));
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !test_39 --
\N
0.0
NaN

-- !test_7 --
\N
-0.0
-1.0
-2.0
0.0
1.0
2.0
NaN

-- !desc --
x float Yes false \N NONE

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hdfs_tvf_float16","external,hive,tvf,external_docker") {
String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")

// It's okay to use random `hdfsUser`, but can not be empty.
def hdfsUserName = "doris"
def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
def uri = ""

String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group0/float16_zeros_and_nans.parquet"
order_qt_test_39 """ select * from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "parquet") """

uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group0/float16_nonzeros_and_nans.parquet"
order_qt_test_7 """ select * from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "parquet") """

order_qt_desc """ desc function HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "parquet") """
}
}
Loading