Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ if(ICEBERG_BUILD_BUNDLE)
arrow/arrow_fs_file_io.cc
avro/avro_data_util.cc
avro/avro_direct_decoder.cc
avro/avro_direct_encoder.cc
avro/avro_reader.cc
avro/avro_writer.cc
avro/avro_register.cc
Expand Down
50 changes: 25 additions & 25 deletions src/iceberg/avro/avro_direct_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace {
Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
const FieldProjection& projection,
const SchemaField& projected_field,
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx);
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx);

/// \brief Skip an Avro value based on its schema without decoding
Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) {
Expand Down Expand Up @@ -146,7 +146,7 @@ Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder)
Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
const std::span<const FieldProjection>& projections,
const StructType& struct_type,
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
if (avro_node->type() != ::avro::AVRO_RECORD) {
return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node));
}
Expand All @@ -157,15 +157,15 @@ Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
// Build a map from Avro field index to projection index (cached per struct schema)
// -1 means the field should be skipped
const FieldProjection* cache_key = projections.data();
auto cache_it = ctx->avro_to_projection_cache.find(cache_key);
auto cache_it = ctx.avro_to_projection_cache.find(cache_key);
std::vector<int>* avro_to_projection;

if (cache_it != ctx->avro_to_projection_cache.end()) {
if (cache_it != ctx.avro_to_projection_cache.end()) {
// Use cached mapping
avro_to_projection = &cache_it->second;
} else {
// Build and cache the mapping
auto [inserted_it, inserted] = ctx->avro_to_projection_cache.emplace(
auto [inserted_it, inserted] = ctx.avro_to_projection_cache.emplace(
cache_key, std::vector<int>(avro_node->leaves(), -1));
avro_to_projection = &inserted_it->second;

Expand Down Expand Up @@ -217,7 +217,7 @@ Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
const FieldProjection& element_projection,
const ListType& list_type,
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
if (avro_node->type() != ::avro::AVRO_ARRAY) {
return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node));
}
Expand Down Expand Up @@ -247,7 +247,7 @@ Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& dec
const FieldProjection& key_projection,
const FieldProjection& value_projection,
const MapType& map_type, ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
DecodeContext& ctx) {
auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder);

if (avro_node->type() == ::avro::AVRO_MAP) {
Expand Down Expand Up @@ -317,7 +317,7 @@ Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node,
const std::span<const FieldProjection>& projections,
const NestedType& projected_type,
::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
DecodeContext& ctx) {
switch (projected_type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = internal::checked_cast<const StructType&>(projected_type);
Expand Down Expand Up @@ -354,7 +354,7 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
::avro::Decoder& decoder,
const SchemaField& projected_field,
::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
DecodeContext& ctx) {
const auto& projected_type = *projected_field.type();
if (!projected_type.is_primitive()) {
return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString());
Expand Down Expand Up @@ -430,8 +430,8 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
ToString(avro_node));
}
auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder);
decoder.decodeString(ctx->string_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->string_scratch));
decoder.decodeString(ctx.string_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.string_scratch));
return {};
}

Expand All @@ -441,9 +441,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
ToString(avro_node));
}
auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder);
decoder.decodeBytes(ctx->bytes_scratch);
decoder.decodeBytes(ctx.bytes_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(
ctx->bytes_scratch.data(), static_cast<int32_t>(ctx->bytes_scratch.size())));
ctx.bytes_scratch.data(), static_cast<int32_t>(ctx.bytes_scratch.size())));
return {};
}

Expand All @@ -456,9 +456,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
auto* builder =
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);

ctx->bytes_scratch.resize(fixed_type.length());
decoder.decodeFixed(fixed_type.length(), ctx->bytes_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
ctx.bytes_scratch.resize(fixed_type.length());
decoder.decodeFixed(fixed_type.length(), ctx.bytes_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data()));
return {};
}

Expand All @@ -472,9 +472,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
auto* builder =
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);

ctx->bytes_scratch.resize(16);
decoder.decodeFixed(16, ctx->bytes_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
ctx.bytes_scratch.resize(16);
decoder.decodeFixed(16, ctx.bytes_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data()));
return {};
}

Expand All @@ -489,11 +489,11 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
size_t byte_width = avro_node->fixedSize();
auto* builder = internal::checked_cast<::arrow::Decimal128Builder*>(array_builder);

ctx->bytes_scratch.resize(byte_width);
decoder.decodeFixed(byte_width, ctx->bytes_scratch);
ctx.bytes_scratch.resize(byte_width);
decoder.decodeFixed(byte_width, ctx.bytes_scratch);
ICEBERG_ARROW_ASSIGN_OR_RETURN(
auto decimal, ::arrow::Decimal128::FromBigEndian(ctx->bytes_scratch.data(),
ctx->bytes_scratch.size()));
auto decimal, ::arrow::Decimal128::FromBigEndian(ctx.bytes_scratch.data(),
ctx.bytes_scratch.size()));
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal));
return {};
}
Expand Down Expand Up @@ -548,7 +548,7 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
const FieldProjection& projection,
const SchemaField& projected_field,
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
if (avro_node->type() == ::avro::AVRO_UNION) {
const size_t branch_index = decoder.decodeUnionIndex();

Expand Down Expand Up @@ -585,7 +585,7 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d
Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
const SchemaProjection& projection,
const Schema& projected_schema,
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
return DecodeNestedValueToBuilder(avro_node, decoder, projection.fields,
projected_schema, array_builder, ctx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/avro/avro_direct_decoder_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ struct DecodeContext {
Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
const SchemaProjection& projection,
const Schema& projected_schema,
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx);
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx);

} // namespace iceberg::avro
Loading
Loading