Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/iceberg/expression/residual_evaluator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,12 @@ class UnpartitionedResidualEvaluator : public ResidualEvaluator {
public:
explicit UnpartitionedResidualEvaluator(std::shared_ptr<Expression> expr)
: ResidualEvaluator(std::move(expr), *PartitionSpec::Unpartitioned(),
*kEmptySchema_, true) {}
*Schema::kEmptySchema, true) {}

Result<std::shared_ptr<Expression>> ResidualFor(
const StructLike& /*partition_data*/) const override {
return expr_;
}

private:
// Store an empty schema to avoid dangling reference when passing to base class
inline static const std::shared_ptr<Schema> kEmptySchema_ =
std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);
};

} // namespace
Expand Down
89 changes: 84 additions & 5 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <format>
#include <functional>
#include <stack>

#include "iceberg/result.h"
#include "iceberg/row/struct_like.h"
Expand All @@ -34,11 +35,80 @@

namespace iceberg {

Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
std::vector<int32_t> identifier_field_ids)
: StructType(std::move(fields)),
schema_id_(schema_id),
identifier_field_ids_(std::move(identifier_field_ids)) {}
namespace {
Status ValidateIdentifierFields(
int32_t field_id, const Schema& schema,
const std::unordered_map<int32_t, int32_t>& id_to_parent) {
ICEBERG_ASSIGN_OR_RAISE(auto field_opt, schema.FindFieldById(field_id));
ICEBERG_PRECHECK(field_opt.has_value(),
"Cannot add field {} as an identifier field: field does not exist",
field_id);

const SchemaField& field = field_opt.value().get();
ICEBERG_PRECHECK(
field.type()->is_primitive(),
"Cannot add field {} as an identifier field: not a primitive type field", field_id);
ICEBERG_PRECHECK(!field.optional(),
"Cannot add field {} as an identifier field: not a required field",
field_id);
ICEBERG_PRECHECK(
field.type()->type_id() != TypeId::kDouble &&
field.type()->type_id() != TypeId::kFloat,
"Cannot add field {} as an identifier field: must not be float or double field",
field_id);

// check whether the nested field is in a chain of required struct fields
// exploring from root for better error message for list and map types
std::stack<int32_t> ancestors;
auto parent_it = id_to_parent.find(field.field_id());
while (parent_it != id_to_parent.end()) {
ancestors.push(parent_it->second);
parent_it = id_to_parent.find(parent_it->second);
}

while (!ancestors.empty()) {
ICEBERG_ASSIGN_OR_RAISE(auto parent_opt, schema.FindFieldById(ancestors.top()));
ICEBERG_PRECHECK(
parent_opt.has_value(),
"Cannot add field {} as an identifier field: parent field id {} does not exist",
field_id, ancestors.top());
const SchemaField& parent = parent_opt.value().get();
ICEBERG_PRECHECK(
parent.type()->type_id() == TypeId::kStruct,
"Cannot add field {} as an identifier field: must not be nested in {}", field_id,
*parent.type());
ICEBERG_PRECHECK(!parent.optional(),
"Cannot add field {} as an identifier field: must not be nested in "
"optional field {}",
field_id, parent.field_id());
ancestors.pop();
}
return {};
}
} // namespace

const std::shared_ptr<Schema> Schema::kEmptySchema =
std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);

Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
: StructType(std::move(fields)), schema_id_(schema_id) {}

Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
std::optional<int32_t> schema_id,
std::vector<int32_t> identifier_field_ids) {
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);

if (!identifier_field_ids.empty()) {
auto id_to_parent = IndexParents(*schema);
for (auto field_id : identifier_field_ids) {
ICEBERG_RETURN_UNEXPECTED(
ValidateIdentifierFields(field_id, *schema, id_to_parent));
}
}

schema->identifier_field_ids_ = std::move(identifier_field_ids);
return schema;
}

Result<std::unique_ptr<Schema>> Schema::Make(
std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
Expand All @@ -53,6 +123,15 @@ Result<std::unique_ptr<Schema>> Schema::Make(
}
fresh_identifier_ids.push_back(field.value().get().field_id());
}

if (!fresh_identifier_ids.empty()) {
auto id_to_parent = IndexParents(*schema);
for (auto field_id : fresh_identifier_ids) {
ICEBERG_RETURN_UNEXPECTED(
ValidateIdentifierFields(field_id, *schema, id_to_parent));
}
}

schema->identifier_field_ids_ = std::move(fresh_identifier_ids);
return schema;
}
Expand Down
24 changes: 19 additions & 5 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,33 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Special value to select all columns from manifest files.
static constexpr std::string_view kAllColumns = "*";

/// \brief An empty schema instance.
static const std::shared_ptr<Schema> kEmptySchema;

explicit Schema(std::vector<SchemaField> fields,
std::optional<int32_t> schema_id = std::nullopt,
std::vector<int32_t> identifier_field_ids = {});
std::optional<int32_t> schema_id = std::nullopt);

/// \brief Create a schema.
///
/// \param fields The fields that make up the schema.
/// \param schema_id The unique identifier for this schema (default:
/// kInitialSchemaId). \param identifier_field_ids Field IDs that uniquely identify
/// rows in the table (default: empty). \return A new Schema instance or Status if
/// failed.
static Result<std::unique_ptr<Schema>> Make(std::vector<SchemaField> fields,
std::optional<int32_t> schema_id,
std::vector<int32_t> identifier_field_ids);

/// \brief Create a schema.
///
/// \param fields The fields that make up the schema.
/// \param schema_id The unique identifier for this schema (default: kInitialSchemaId).
/// \param identifier_field_names Canonical names of fields that uniquely identify rows
/// in the table (default: empty). \return A new Schema instance or Status if failed.
/// in the table (default: empty).
/// \return A new Schema instance or Status if failed.
static Result<std::unique_ptr<Schema>> Make(
std::vector<SchemaField> fields, std::optional<int32_t> schema_id = std::nullopt,
const std::vector<std::string>& identifier_field_names = {});
std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
const std::vector<std::string>& identifier_field_names);

/// \brief Get the schema ID.
///
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,9 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,

metadata_.last_column_id = new_last_column_id;

auto new_schema =
std::make_shared<Schema>(schema.fields() | std::ranges::to<std::vector>(),
new_schema_id, schema.IdentifierFieldIds());
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<Schema> new_schema,
Schema::Make(schema.fields() | std::ranges::to<std::vector>(),
new_schema_id, schema.IdentifierFieldIds()))

if (!schema_found) {
metadata_.schemas.push_back(new_schema);
Expand Down
25 changes: 11 additions & 14 deletions src/iceberg/test/assign_id_visitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ std::shared_ptr<Type> CreateNestedStruct() {
});
}

Schema CreateNestedSchema(std::vector<int32_t> identifier_field_ids = {}) {
return Schema(
Result<std::unique_ptr<Schema>> CreateNestedSchema(
std::vector<int32_t> identifier_field_ids = {}) {
return Schema::Make(
{
SchemaField::MakeRequired(/*field_id=*/10, "id", iceberg::int64()),
SchemaField::MakeOptional(/*field_id=*/20, "list", CreateListOfStruct()),
Expand Down Expand Up @@ -108,11 +109,11 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) {
}

TEST(AssignFreshIdVisitorTest, NestedSchema) {
Schema schema = CreateNestedSchema();
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema());
std::atomic<int32_t> id = 0;
auto next_id = [&id]() { return ++id; };
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));

ASSERT_EQ(4, fresh_schema->fields().size());
for (int32_t i = 0; i < fresh_schema->fields().size(); ++i) {
Expand Down Expand Up @@ -169,20 +170,16 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) {
}

TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) {
std::atomic<int32_t> id = 0;
int32_t id = 0;
auto next_id = [&id]() { return ++id; };

Schema invalid_schema = CreateNestedSchema({10, 400});
// Invalid identified field id
auto result = AssignFreshIds(Schema::kInitialSchemaId, invalid_schema, next_id);
EXPECT_THAT(result, IsError(ErrorKind::kInvalidSchema));
EXPECT_THAT(result, HasErrorMessage("Cannot find"));

id = 0;
Schema schema = CreateNestedSchema({10, 301});
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema({10, 301}));
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));
EXPECT_THAT(fresh_schema->IdentifierFieldIds(), testing::ElementsAre(1, 12));
ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names,
fresh_schema->IdentifierFieldNames());
EXPECT_THAT(identifier_field_names, testing::ElementsAre("id", "struct.outer_id"));
}

} // namespace iceberg
Loading
Loading