|
18 | 18 | #include "postgres.h" |
19 | 19 | #include "miscadmin.h" |
20 | 20 |
|
| 21 | +#include "access/heapam.h" |
21 | 22 | #include "access/table.h" |
22 | 23 | #include "access/tableam.h" |
23 | 24 | #include "catalog/heap.h" |
|
37 | 38 | #include "utils/rel.h" |
38 | 39 | #include "utils/typcache.h" |
39 | 40 | #include "utils/inval.h" |
| 41 | +#include "utils/fmgroids.h" |
40 | 42 |
|
41 | 43 | #include "pg_lake/access_method/access_method.h" |
42 | 44 | #include "pg_lake/data_file/data_files.h" |
@@ -112,6 +114,8 @@ typedef struct PgLakeDDL |
112 | 114 | static bool Allowed(Node *arg); |
113 | 115 | static bool Disallowed(Node *arg); |
114 | 116 | static bool DisallowedAddColumnWithUnsupportedConstraints(Node *arg); |
| 117 | +static bool DisallowedForWritableRestRenameTable(Node *arg); |
| 118 | +static bool DisallowedForWritableRestSetSchema(Node *arg); |
115 | 119 |
|
116 | 120 | PgLakeAlterTableHookType PgLakeAlterTableHook = NULL; |
117 | 121 | PgLakeAlterTableRenameColumnHookType PgLakeAlterTableRenameColumnHook = NULL; |
@@ -171,10 +175,10 @@ static const PgLakeDDL PgLakeDDLs[] = { |
171 | 175 | ALTER_TABLE_DDL(AT_DisableTrigAll, Allowed, Allowed), |
172 | 176 | ALTER_TABLE_DDL(AT_EnableTrigUser, Allowed, Allowed), |
173 | 177 | ALTER_TABLE_DDL(AT_DisableTrigUser, Allowed, Allowed), |
174 | | - ALTER_SCHEMA_DDL(OBJECT_TABLE, Allowed, Allowed), |
175 | | - ALTER_SCHEMA_DDL(OBJECT_FOREIGN_TABLE, Allowed, Allowed), |
176 | | - RENAME_TABLE_DDL(OBJECT_TABLE, Allowed, Allowed), |
177 | | - RENAME_TABLE_DDL(OBJECT_FOREIGN_TABLE, Allowed, Allowed), |
| 178 | + ALTER_SCHEMA_DDL(OBJECT_TABLE, DisallowedForWritableRestSetSchema, Allowed), |
| 179 | + ALTER_SCHEMA_DDL(OBJECT_FOREIGN_TABLE, DisallowedForWritableRestRenameTable, Allowed), |
| 180 | + RENAME_TABLE_DDL(OBJECT_TABLE, DisallowedForWritableRestRenameTable, Allowed), |
| 181 | + RENAME_TABLE_DDL(OBJECT_FOREIGN_TABLE, DisallowedForWritableRestRenameTable, Allowed), |
178 | 182 | }; |
179 | 183 |
|
180 | 184 | #define N_PG_LAKE_DDLS (sizeof(PgLakeDDLs) / sizeof(PgLakeDDLs[0])) |
@@ -205,6 +209,8 @@ static bool HasPartitionByAdded(AlterTableStmt *alterStmt); |
205 | 209 | static bool HasPartitionByDropped(AlterTableStmt *alterStmt); |
206 | 210 | static bool HasOnlyCatalogAlterTableOptions(AlterTableStmt *alterStmt); |
207 | 211 | static void ErrorIfUnsupportedTableOptionChange(AlterTableStmt *alterStmt, List *allowedOptions); |
| 212 | +static void ErrorIfAnyRestCatalogTablesInSchema(const char *schemaName); |
| 213 | + |
208 | 214 |
|
209 | 215 | /* |
210 | 216 | * ProcessAlterTable is used in cases where we want to preempt the error |
@@ -611,6 +617,13 @@ PostProcessRenameWritablePgLakeTable(ProcessUtilityParams * params, void *arg) |
611 | 617 |
|
612 | 618 | RenameStmt *renameStmt = (RenameStmt *) plannedStmt->utilityStmt; |
613 | 619 |
|
| 620 | + if (renameStmt->renameType == OBJECT_SCHEMA) |
| 621 | + { |
| 622 | + /* we are in post-process, use new name */ |
| 623 | + ErrorIfAnyRestCatalogTablesInSchema(renameStmt->newname); |
| 624 | + return; |
| 625 | + } |
| 626 | + |
614 | 627 | if (renameStmt->renameType != OBJECT_COLUMN && |
615 | 628 | renameStmt->renameType != OBJECT_ATTRIBUTE && |
616 | 629 | renameStmt->renameType != OBJECT_TABLE && |
@@ -1027,6 +1040,58 @@ Disallowed(Node *arg) |
1027 | 1040 | } |
1028 | 1041 |
|
1029 | 1042 |
|
| 1043 | +/* |
| 1044 | +* We have not yet implemented table renames in REST catalog. |
| 1045 | +*/ |
| 1046 | +static bool |
| 1047 | +DisallowedForWritableRestRenameTable(Node *arg) |
| 1048 | +{ |
| 1049 | + RenameStmt *renameStmt = (RenameStmt *) arg; |
| 1050 | + |
| 1051 | + /* only disallow RENAME TABLE/FOREIGN TABLE for writable rest catalog */ |
| 1052 | + if (renameStmt->renameType == OBJECT_TABLE || |
| 1053 | + renameStmt->renameType == OBJECT_FOREIGN_TABLE) |
| 1054 | + { |
| 1055 | + |
| 1056 | + Oid namespaceId = |
| 1057 | + RangeVarGetAndCheckCreationNamespace(renameStmt->relation, NoLock, NULL); |
| 1058 | + |
| 1059 | + /* we are in the post-process, so should use new name */ |
| 1060 | + const char *relationName = renameStmt->newname; |
| 1061 | + |
| 1062 | + Oid relationId = get_relname_relid(relationName, namespaceId); |
| 1063 | + IcebergCatalogType icebergCatalogType = GetIcebergCatalogType(relationId); |
| 1064 | + |
| 1065 | + return (icebergCatalogType != REST_CATALOG_READ_WRITE); |
| 1066 | + } |
| 1067 | + |
| 1068 | + return false; |
| 1069 | +} |
| 1070 | + |
| 1071 | +/* |
| 1072 | +* We have not yet implemented schema changes in REST catalog. |
| 1073 | +*/ |
| 1074 | +static bool |
| 1075 | +DisallowedForWritableRestSetSchema(Node *arg) |
| 1076 | +{ |
| 1077 | + AlterObjectSchemaStmt *alterSchemaStmt = (AlterObjectSchemaStmt *) arg; |
| 1078 | + |
| 1079 | + Oid namespaceId = get_namespace_oid(alterSchemaStmt->newschema, alterSchemaStmt->missing_ok); |
| 1080 | + |
| 1081 | + /* if not validOid, let Postgres handle */ |
| 1082 | + if (!OidIsValid(namespaceId)) |
| 1083 | + { |
| 1084 | + return false; |
| 1085 | + } |
| 1086 | + |
| 1087 | + Oid relationId = get_relname_relid(alterSchemaStmt->relation->relname, namespaceId); |
| 1088 | + |
| 1089 | + IcebergCatalogType icebergCatalogType = GetIcebergCatalogType(relationId); |
| 1090 | + |
| 1091 | + return (icebergCatalogType != REST_CATALOG_READ_WRITE); |
| 1092 | +} |
| 1093 | + |
| 1094 | + |
1030 | 1095 | /* |
1031 | 1096 | * ErrorIfUnsupportedSetAccessMethod checks ALTER TABLE on non-pg_lake |
1032 | 1097 | * tables for possible issues involving Iceberg. |
@@ -1099,6 +1164,8 @@ DisallowedAddColumnWithUnsupportedConstraints(Node *arg) |
1099 | 1164 | return true; |
1100 | 1165 | } |
1101 | 1166 |
|
| 1167 | + |
| 1168 | + |
1102 | 1169 | /* |
1103 | 1170 | * AlterTableAddColumnHasMutableDefault returns true if ALTER TABLE .. ADD COLUMN |
1104 | 1171 | * has a mutable default value. e.g. ALTER TABLE .. ADD COLUMN .. DEFAULT now(). |
@@ -1397,3 +1464,42 @@ HasPartitionByDropped(AlterTableStmt *alterStmt) |
1397 | 1464 |
|
1398 | 1465 | return partitionByDropped; |
1399 | 1466 | } |
| 1467 | + |
| 1468 | + |
| 1469 | +static void |
| 1470 | +ErrorIfAnyRestCatalogTablesInSchema(const char *schemaName) |
| 1471 | +{ |
| 1472 | + Oid namespaceId = get_namespace_oid(schemaName, false); |
| 1473 | + |
| 1474 | + Relation classRel = table_open(RelationRelationId, AccessShareLock); |
| 1475 | + HeapTuple tuple; |
| 1476 | + |
| 1477 | + ScanKeyData key[1]; |
| 1478 | + |
| 1479 | + ScanKeyInit(&key[0], |
| 1480 | + Anum_pg_class_relnamespace, |
| 1481 | + BTEqualStrategyNumber, F_OIDEQ, |
| 1482 | + ObjectIdGetDatum(namespaceId)); |
| 1483 | + |
| 1484 | + /* get all the relations present in the specified schema */ |
| 1485 | + TableScanDesc scan = table_beginscan_catalog(classRel, 1, key); |
| 1486 | + |
| 1487 | + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) |
| 1488 | + { |
| 1489 | + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); |
| 1490 | + Oid relid = relForm->oid; |
| 1491 | + |
| 1492 | + IcebergCatalogType icebergCatalogType = GetIcebergCatalogType(relid); |
| 1493 | + |
| 1494 | + if (icebergCatalogType == REST_CATALOG_READ_WRITE) |
| 1495 | + { |
| 1496 | + ereport(ERROR, |
| 1497 | + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 1498 | + errmsg("cannot rename schema \"%s\" because it contains " |
| 1499 | + "an iceberg table with rest catalog \"%s\"", schemaName, get_rel_name(relid)))); |
| 1500 | + } |
| 1501 | + } |
| 1502 | + |
| 1503 | + table_endscan(scan); |
| 1504 | + table_close(classRel, AccessShareLock); |
| 1505 | +} |
0 commit comments