Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/iceberg/expression/residual_evaluator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ class UnpartitionedResidualEvaluator : public ResidualEvaluator {

private:
// Store an empty schema to avoid dangling reference when passing to base class
inline static const std::shared_ptr<Schema> kEmptySchema_ =
std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);
inline static const std::shared_ptr<Schema> kEmptySchema_ = Schema::EmptySchema();
};

} // namespace
Expand Down
13 changes: 7 additions & 6 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ nlohmann::json ToJson(const Type& type) {

nlohmann::json ToJson(const Schema& schema) {
nlohmann::json json = ToJson(static_cast<const Type&>(schema));
SetOptionalField(json, kSchemaId, schema.schema_id());
json[kSchemaId] = schema.schema_id();
// TODO(gangwu): add identifier-field-ids.
return json;
}
Expand Down Expand Up @@ -466,14 +466,16 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
}

Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
ICEBERG_ASSIGN_OR_RAISE(auto schema_id_opt,
GetJsonValueOptional<int32_t>(json, kSchemaId));
ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));

if (type->type_id() != TypeId::kStruct) [[unlikely]] {
return JsonParseError("Schema must be a struct type, but got {}", SafeDumpJson(json));
}

auto& struct_type = static_cast<StructType&>(*type);
auto schema_id = schema_id_opt.value_or(Schema::kInitialSchemaId);
return FromStructType(std::move(struct_type), schema_id);
}

Expand Down Expand Up @@ -762,7 +764,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
}

// write the current schema ID and schema list
SetOptionalField(json, kCurrentSchemaId, table_metadata.current_schema_id);
json[kCurrentSchemaId] = table_metadata.current_schema_id;
json[kSchemas] = ToJsonList(table_metadata.schemas);

// for older readers, continue writing the default spec as "partition-spec"
Expand Down Expand Up @@ -824,8 +826,7 @@ namespace {
///
/// \return The current schema or parse error.
Result<std::shared_ptr<Schema>> ParseSchemas(
const nlohmann::json& json, int8_t format_version,
std::optional<int32_t>& current_schema_id,
const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id,
std::vector<std::shared_ptr<Schema>>& schemas) {
std::shared_ptr<Schema> current_schema;
if (json.contains(kSchemas)) {
Expand All @@ -848,7 +849,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
}
if (!current_schema) {
return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId,
current_schema_id.value(), SafeDumpJson(schema_array));
current_schema_id, SafeDumpJson(schema_array));
}
} else {
if (format_version != 1) {
Expand Down
16 changes: 11 additions & 5 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@

namespace iceberg {

Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id,
std::vector<int32_t> identifier_field_ids)
: StructType(std::move(fields)),
schema_id_(schema_id),
identifier_field_ids_(std::move(identifier_field_ids)) {}

Result<std::unique_ptr<Schema>> Schema::Make(
std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
std::vector<SchemaField> fields, int32_t schema_id,
const std::vector<std::string>& identifier_field_names) {
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);

Expand All @@ -57,7 +57,13 @@ Result<std::unique_ptr<Schema>> Schema::Make(
return schema;
}

std::optional<int32_t> Schema::schema_id() const { return schema_id_; }
const std::shared_ptr<Schema>& Schema::EmptySchema() {
static const auto empty_schema =
std::make_shared<Schema>(std::vector<SchemaField>{}, kInitialSchemaId);
return empty_schema;
}

int32_t Schema::schema_id() const { return schema_id_; }

std::string Schema::ToString() const {
std::string repr = "schema<";
Expand Down Expand Up @@ -196,7 +202,7 @@ Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> name
auto pruned_type, visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));

if (!pruned_type) {
return std::make_unique<Schema>(std::vector<SchemaField>{}, std::nullopt);
return std::make_unique<Schema>(std::vector<SchemaField>{}, kInitialSchemaId);
}

if (pruned_type->type_id() != TypeId::kStruct) {
Expand All @@ -214,7 +220,7 @@ Result<std::unique_ptr<Schema>> Schema::Project(
auto project_type, visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));

if (!project_type) {
return std::make_unique<Schema>(std::vector<SchemaField>{}, std::nullopt);
return std::make_unique<Schema>(std::vector<SchemaField>{}, kInitialSchemaId);
}

if (project_type->type_id() != TypeId::kStruct) {
Expand Down
14 changes: 9 additions & 5 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Special value to select all columns from manifest files.
static constexpr std::string_view kAllColumns = "*";

explicit Schema(std::vector<SchemaField> fields,
std::optional<int32_t> schema_id = std::nullopt,
explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
std::vector<int32_t> identifier_field_ids = {});

/// \brief Create a schema.
Expand All @@ -63,14 +62,19 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \param identifier_field_names Canonical names of fields that uniquely identify rows
/// in the table (default: empty). \return A new Schema instance or Status if failed.
static Result<std::unique_ptr<Schema>> Make(
std::vector<SchemaField> fields, std::optional<int32_t> schema_id = std::nullopt,
std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
const std::vector<std::string>& identifier_field_names = {});

/// \brief Get an empty schema.
///
/// An empty schema has no fields and a schema ID of 0.
static const std::shared_ptr<Schema>& EmptySchema();

/// \brief Get the schema ID.
///
/// A schema is identified by a unique ID for the purposes of schema
/// evolution.
std::optional<int32_t> schema_id() const;
int32_t schema_id() const;

std::string ToString() const override;

Expand Down Expand Up @@ -178,7 +182,7 @@ class ICEBERG_EXPORT Schema : public StructType {
const Schema&);
static Result<int32_t> InitHighestFieldId(const Schema&);

const std::optional<int32_t> schema_id_;
const int32_t schema_id_;
/// Field IDs that uniquely identify rows in the table.
std::vector<int32_t> identifier_field_ids_;
/// Mapping from field id to field.
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,13 @@ Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
} // namespace

std::unique_ptr<Schema> FromStructType(StructType&& struct_type,
std::optional<int32_t> schema_id) {
std::optional<int32_t> schema_id_opt) {
std::vector<SchemaField> fields;
fields.reserve(struct_type.fields().size());
for (auto& field : struct_type.fields()) {
fields.emplace_back(std::move(field));
}
auto schema_id = schema_id_opt.value_or(Schema::kInitialSchemaId);
return std::make_unique<Schema>(std::move(fields), schema_id);
}

Expand Down
27 changes: 10 additions & 17 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
return SchemaById(current_schema_id);
}

Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
std::optional<int32_t> schema_id) const {
Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(int32_t schema_id) const {
auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
return schema != nullptr && schema->schema_id() == schema_id;
});
if (iter == schemas.end()) {
return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
return NotFound("Schema with ID {} is not found", schema_id);
}
return *iter;
}
Expand Down Expand Up @@ -198,11 +197,8 @@ Result<TableMetadataCache::SnapshotsMapRef> TableMetadataCache::GetSnapshotsById

Result<TableMetadataCache::SchemasMap> TableMetadataCache::InitSchemasMap(
const TableMetadata* metadata) {
return metadata->schemas | std::views::filter([](const auto& schema) {
return schema->schema_id().has_value();
}) |
std::views::transform([](const auto& schema) {
return std::make_pair(schema->schema_id().value(), schema);
return metadata->schemas | std::views::transform([](const auto& schema) {
return std::make_pair(schema->schema_id(), schema);
}) |
std::ranges::to<SchemasMap>();
}
Expand Down Expand Up @@ -387,9 +383,7 @@ class TableMetadataBuilder::Impl {
: base_(base_metadata), metadata_(*base_metadata) {
// Initialize index maps from base metadata
for (const auto& schema : metadata_.schemas) {
if (schema->schema_id().has_value()) {
schemas_by_id_.emplace(schema->schema_id().value(), schema);
}
schemas_by_id_.emplace(schema->schema_id(), schema);
}

for (const auto& spec : metadata_.partition_specs) {
Expand Down Expand Up @@ -758,14 +752,13 @@ Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {

Status TableMetadataBuilder::Impl::RemoveSchemas(
const std::unordered_set<int32_t>& schema_ids) {
auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
auto current_schema_id = metadata_.current_schema_id;
ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id),
"Cannot remove current schema: {}", current_schema_id);

if (!schema_ids.empty()) {
metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& schema) {
return schema->schema_id().has_value() &&
!schema_ids.contains(schema->schema_id().value());
return !schema_ids.contains(schema->schema_id());
}) |
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
changes_.push_back(std::make_unique<table::RemoveSchemas>(schema_ids));
Expand Down Expand Up @@ -829,7 +822,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
std::chrono::system_clock::now().time_since_epoch())};
}

auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
auto current_schema_id = metadata_.current_schema_id;
auto schema_it = schemas_by_id_.find(current_schema_id);
ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(),
"Current schema ID {} not found in schemas", current_schema_id);
Expand Down Expand Up @@ -902,9 +895,9 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
const Schema& new_schema) const {
// if the schema already exists, use its id; otherwise use the highest id + 1
auto new_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
auto new_schema_id = metadata_.current_schema_id;
for (auto& schema : metadata_.schemas) {
auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);
auto schema_id = schema->schema_id();
if (schema->SameSchema(new_schema)) {
return schema_id;
} else if (new_schema_id <= schema_id) {
Expand Down
5 changes: 2 additions & 3 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ struct ICEBERG_EXPORT TableMetadata {
/// A list of schemas
std::vector<std::shared_ptr<iceberg::Schema>> schemas;
/// ID of the table's current schema
std::optional<int32_t> current_schema_id;
int32_t current_schema_id;
/// A list of partition specs
std::vector<std::shared_ptr<iceberg::PartitionSpec>> partition_specs;
/// ID of the current partition spec that writers should use by default
Expand Down Expand Up @@ -130,8 +130,7 @@ struct ICEBERG_EXPORT TableMetadata {
/// \brief Get the current schema, return NotFoundError if not found
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
/// \brief Get the current schema by ID, return NotFoundError if not found
Result<std::shared_ptr<iceberg::Schema>> SchemaById(
std::optional<int32_t> schema_id) const;
Result<std::shared_ptr<iceberg::Schema>> SchemaById(int32_t schema_id) const;
/// \brief Get the current partition spec, return NotFoundError if not found
Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpec() const;
/// \brief Get the current sort order, return NotFoundError if not found
Expand Down
9 changes: 2 additions & 7 deletions src/iceberg/table_requirement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,10 @@ Status AssertCurrentSchemaID::Validate(const TableMetadata* base) const {
return CommitFailed("Requirement failed: current table metadata is missing");
}

if (!base->current_schema_id.has_value()) {
return CommitFailed(
"Requirement failed: current schema ID is not set in table metadata");
}

if (base->current_schema_id.value() != schema_id_) {
if (base->current_schema_id != schema_id_) {
return CommitFailed(
"Requirement failed: current schema ID does not match, expected {} != {}",
schema_id_, base->current_schema_id.value());
schema_id_, base->current_schema_id);
}

return {};
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/table_requirements.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ void TableUpdateContext::RequireLastAssignedFieldIdUnchanged() {
void TableUpdateContext::RequireCurrentSchemaIdUnchanged() {
if (!added_current_schema_id_) {
if (base_ != nullptr && !is_replace_) {
AddRequirement(std::make_unique<table::AssertCurrentSchemaID>(
base_->current_schema_id.value()));
AddRequirement(
std::make_unique<table::AssertCurrentSchemaID>(base_->current_schema_id));
}
added_current_schema_id_ = true;
}
Expand Down
5 changes: 2 additions & 3 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {

if (!context_.projected_schema) {
const auto& snapshot = context_.snapshot;
auto schema_id =
snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id;
auto schema_id = table_metadata->current_schema_id;
ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id));

if (column_names_.empty()) {
Expand All @@ -231,7 +230,7 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
auto field_opt = schema->GetFieldByName(column_name);
if (!field_opt) {
return InvalidArgument("Column {} not found in schema '{}'", column_name,
*schema_id);
schema_id);
}
projected_fields.emplace_back(field_opt.value()->get());
}
Expand Down
5 changes: 2 additions & 3 deletions src/iceberg/test/metadata_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ TEST(MetadataSerdeTest, DeserializeV1Valid) {
auto expected_schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64()),
SchemaField::MakeRequired(2, "y", int64()),
SchemaField::MakeRequired(3, "z", int64())},
/*schema_id=*/std::nullopt);
SchemaField::MakeRequired(3, "z", int64())});

auto expected_spec_result = PartitionSpec::Make(
/*spec_id=*/0,
Expand All @@ -115,7 +114,7 @@ TEST(MetadataSerdeTest, DeserializeV1Valid) {
.last_updated_ms = TimePointMsFromUnixMs(1602638573874).value(),
.last_column_id = 3,
.schemas = {expected_schema},
.current_schema_id = std::nullopt,
.current_schema_id = Schema::kInitialSchemaId,
.partition_specs = {expected_spec},
.default_spec_id = 0,
.last_partition_id = 1000,
Expand Down
Loading
Loading