Skip to content

Commit fa0a562

Browse files
Change the object store table/catalog layout (#97)
This change does 2 things: - When a table is created with `catalog=obeject_store`, the catalog file used to be in `defaultPrefix/_catalog/frompg/pg_db.json` after this commit: `defaultPrefix/frompg/catalog/pg_db.json` - When an `catalog=object_store` iceberg table is created, it used to be located in `defaultPrefix/dbname/schema/table_name/oid`. After this commit: `defaultPrefix/frompg/tables/dbname/schema/table_name/oid` The goal is to be able to differentiate the tables written by Postgres, and other systems. The `frompg` allows this. Also, we can control the access of `/catalog` folder such that only priviledged users can access that.
1 parent c5b4a66 commit fa0a562

File tree

5 files changed

+94
-18
lines changed

5 files changed

+94
-18
lines changed

pg_lake_iceberg/include/pg_lake/iceberg/catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ extern PGDLLEXPORT void UpdateExternalCatalogMetadataLocation(char *catalogName,
6060
const char *previousMetadataLocation);
6161
extern PGDLLEXPORT void UpdateInternalCatalogMetadataLocation(Oid relationId, const char *metadataLocation, const char *previousMetadataLocation);
6262
extern PGDLLEXPORT void UpdateAllInternalIcebergTablesToReadOnly(void);
63-
extern PGDLLEXPORT const char *GetIcebergDefaultLocationPrefix(void);
63+
extern PGDLLEXPORT char *GetIcebergDefaultLocationPrefix(void);
6464
extern PGDLLEXPORT bool IcebergTablesCatalogExists(void);
6565
extern PGDLLEXPORT void ErrorIfReadOnlyIcebergTable(Oid relationId);
6666
extern PGDLLEXPORT bool WarnIfReadOnlyIcebergTable(Oid relationId);

pg_lake_iceberg/src/iceberg/catalog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ UpdateAllInternalIcebergTablesToReadOnly(void)
740740
* GetIcebergDefaultLocationPrefix returns the default location prefix
741741
* for iceberg tables. Trailing slash is removed, if present.
742742
*/
743-
const char *
743+
char *
744744
GetIcebergDefaultLocationPrefix(void)
745745
{
746746
if (IcebergDefaultLocationPrefix == NULL)

pg_lake_iceberg/src/object_store_catalog/object_store_catalog.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ GetExternalObjectStoreCatalogFilePath(const char *catalogName)
504504
errdetail("Set the GUC to use catalog=object_store.")));
505505
}
506506

507-
return psprintf("%s/_catalog/%s/%s.json", defaultPrefix,
507+
return psprintf("%s/%s/catalog/%s.json", defaultPrefix,
508508
ExternalObjectStorePrefix, URLEncodePath(catalogName));
509509
}
510510

@@ -529,7 +529,7 @@ GetInternalObjectStoreCatalogFilePath(const char *catalogName)
529529
errdetail("Set the GUC to use catalog=object_store.")));
530530
}
531531

532-
return psprintf("%s/_catalog/%s/%s.json", defaultPrefix,
532+
return psprintf("%s/%s/catalog/%s.json", defaultPrefix,
533533
InternalObjectStorePrefix, URLEncodePath(catalogName));
534534
}
535535

pg_lake_table/src/ddl/create_table.c

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -822,7 +822,40 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params)
822822
}
823823

824824
DefElem *locationOption = GetOption(createStmt->options, "location");
825-
const char *defaultLocationPrefix = GetIcebergDefaultLocationPrefix();
825+
char *defaultLocationPrefix = GetIcebergDefaultLocationPrefix();
826+
827+
if (hasObjectStoreCatalogOption)
828+
{
829+
const char *objectStoreCatalogLocationPrefix = GetObjectStoreDefaultLocationPrefix();
830+
831+
if (objectStoreCatalogLocationPrefix == NULL)
832+
{
833+
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
834+
errmsg("object store catalog iceberg tables require "
835+
"pg_lake_iceberg.object_store_catalog_location_prefix "
836+
"to be set")));
837+
}
838+
839+
if (InternalObjectStorePrefix == NULL)
840+
{
841+
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
842+
errmsg("object store catalog iceberg tables require "
843+
"pg_lake_iceberg.internal_iceberg_storage_prefix "
844+
"to be set")));
845+
}
846+
847+
/* here we only deal with writable tables */
848+
Assert(!HasReadOnlyOption(createStmt->options));
849+
850+
/*
851+
* For hasObjectStoreCatalogOption, we also append
852+
* InternalObjectStorePrefix/tables to the location
853+
*/
854+
defaultLocationPrefix = psprintf("%s/%s/%s",
855+
defaultLocationPrefix,
856+
InternalObjectStorePrefix,
857+
"tables");
858+
}
826859

827860
/*
828861
* We will set the location by using the default location prefix when user
@@ -841,19 +874,6 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params)
841874
}
842875

843876

844-
if (hasObjectStoreCatalogOption)
845-
{
846-
const char *objectStoreCatalogLocationPrefix = GetObjectStoreDefaultLocationPrefix();
847-
848-
if (objectStoreCatalogLocationPrefix == NULL)
849-
{
850-
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
851-
errmsg("object store catalog iceberg tables require "
852-
"pg_lake_iceberg.object_store_catalog_location_prefix "
853-
"to be set")));
854-
}
855-
}
856-
857877
/*
858878
* Our CREATE FOREIGN TABLE statement is fully ready for execution, so we
859879
* go to the parent ProcessUtility.

pg_lake_table/tests/pytests/test_object_store_catalog.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,62 @@ def test_iceberg_multiple_dbs(
722722
superuser_conn.autocommit = False
723723

724724

725+
def test_create_table_with_default_location_object_store(
726+
pg_conn,
727+
superuser_conn,
728+
s3,
729+
extension,
730+
with_default_location,
731+
adjust_object_store_settings,
732+
):
733+
dbname = run_query("SELECT current_database()", pg_conn)[0][0]
734+
run_command(
735+
"CREATE SCHEMA test_create_table_with_default_location_object_store", pg_conn
736+
)
737+
run_command(
738+
f"""CREATE TABLE test_create_table_with_default_location_object_store.tbl (a int, b int)
739+
USING iceberg WITH (catalog='object_store')""",
740+
pg_conn,
741+
)
742+
pg_conn.commit()
743+
wait_until_object_store_writable_table_pushed(
744+
pg_conn, "test_create_table_with_default_location_object_store", "tbl"
745+
)
746+
747+
# assert metadata location
748+
result = run_query(
749+
"""SELECT metadata_location FROM iceberg_tables
750+
WHERE table_namespace = 'test_create_table_with_default_location_object_store' and table_name = 'tbl'
751+
""",
752+
pg_conn,
753+
)
754+
first_table_metadata_location = result[0][0]
755+
756+
table_oid = run_query(
757+
"""SELECT oid FROM pg_class
758+
WHERE oid = 'test_create_table_with_default_location_object_store.tbl'::regclass
759+
""",
760+
pg_conn,
761+
)[0][0]
762+
763+
prefix = run_query(
764+
"SHOW pg_lake_iceberg.internal_object_store_catalog_prefix", superuser_conn
765+
)[0][0]
766+
superuser_conn.commit()
767+
768+
assert (
769+
f"s3://{TEST_BUCKET}/{prefix}/tables/{dbname}/test_create_table_with_default_location_object_store/tbl/{table_oid}"
770+
in first_table_metadata_location
771+
)
772+
773+
# drop the table and create it again
774+
run_command(
775+
"DROP SCHEMA test_create_table_with_default_location_object_store CASCADE",
776+
pg_conn,
777+
)
778+
pg_conn.commit()
779+
780+
725781
def test_complex_types_object_store(
726782
pg_conn, s3, extension, with_default_location, adjust_object_store_settings
727783
):

0 commit comments

Comments
 (0)