Skip to content

Commit 425c7a8

Browse files
Support pruning for external tables
Signed-off-by: Aykut Bozkurt <[email protected]>
1 parent 1e5f0b7 commit 425c7a8

File tree

21 files changed

+960
-519
lines changed

21 files changed

+960
-519
lines changed

pg_lake_iceberg/include/pg_lake/iceberg/api/table_schema.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ extern PGDLLEXPORT IcebergTableSchema * GetIcebergTableSchemaByIdFromTableMetada
2727
extern PGDLLEXPORT IcebergTableSchema * GetCurrentIcebergTableSchema(IcebergTableMetadata * metadata);
2828
extern PGDLLEXPORT List *GetLeafFieldsFromIcebergMetadata(IcebergTableMetadata * metadata);
2929
extern PGDLLEXPORT List *GetLeafFieldsForIcebergSchema(IcebergTableSchema * schema);
30+
extern PGDLLEXPORT DataFileSchemaField * GetDataFileSchemaFieldById(DataFileSchema * schema, int fieldId);
3031

3132
/* write api */
3233
extern PGDLLEXPORT IcebergTableSchema * RebuildIcebergSchemaFromDataFileSchema(Oid foreignTableOid,

pg_lake_iceberg/src/iceberg/api/manifest.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ SetManifestPartitionSummary(IcebergManifest * manifest, List *manifestEntries, L
254254

255255
/* unexpected but better than crash */
256256
if (partitionTransform == NULL)
257-
ereport(ERROR, (errmsg("Could not find partition transform for field %d", partitionFieldId)));
257+
ereport(ERROR, (errmsg("could not find partition transform for field %" PRId32, partitionFieldId)));
258258

259259
PGType resultPgType = partitionTransform->resultPgType;
260260
Field *resultField = PostgresTypeToIcebergField(resultPgType, false, NULL);

pg_lake_iceberg/src/iceberg/api/table_schema.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,38 @@ GetLeafFieldsForIcebergSchema(IcebergTableSchema * schema)
169169
}
170170

171171

172+
/*
173+
* GetDataFileSchemaFieldById gets the DataFileSchemaField for the given
174+
* iceberg field id.
175+
*/
176+
DataFileSchemaField *
177+
GetDataFileSchemaFieldById(DataFileSchema * schema, int fieldId)
178+
{
179+
DataFileSchemaField *schemaField = NULL;
180+
181+
for (size_t fieldIdx = 0; fieldIdx < schema->nfields; fieldIdx++)
182+
{
183+
DataFileSchemaField *field = &schema->fields[fieldIdx];
184+
185+
if (field->id == fieldId)
186+
{
187+
schemaField = field;
188+
break;
189+
}
190+
}
191+
192+
if (schemaField == NULL)
193+
{
194+
ereport(ERROR,
195+
(errcode(ERRCODE_INTERNAL_ERROR),
196+
errmsg("field ID %d not found",
197+
fieldId)));
198+
}
199+
200+
return schemaField;
201+
}
202+
203+
172204
/*
173205
* GetLeafFieldsForField returns the leaf fields for the given field.
174206
* It recursively traverses the field tree and collects all leaf fields, and

pg_lake_iceberg/src/iceberg/partitioning/partition.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,5 @@ FindPartitionTransformById(List *transforms, int32_t partitionFieldId)
229229
return transform;
230230
}
231231

232-
ereport(ERROR,
233-
(errcode(ERRCODE_INTERNAL_ERROR),
234-
errmsg("could not find partition transform for field id %d",
235-
partitionFieldId)));
232+
return NULL;
236233
}

pg_lake_iceberg/tests/conftest.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,31 +55,6 @@ def superuser_conn(postgres):
5555
conn.close()
5656

5757

58-
@pytest.fixture(scope="session")
59-
def iceberg_extension(postgres):
60-
superuser_conn = open_pg_conn()
61-
62-
run_command(
63-
f"""
64-
CREATE EXTENSION IF NOT EXISTS pg_lake_iceberg CASCADE;
65-
""",
66-
superuser_conn,
67-
)
68-
superuser_conn.commit()
69-
70-
yield
71-
superuser_conn.rollback()
72-
73-
run_command(
74-
f"""
75-
DROP EXTENSION pg_lake_iceberg CASCADE;
76-
""",
77-
superuser_conn,
78-
)
79-
superuser_conn.commit()
80-
superuser_conn.close()
81-
82-
8358
@pytest.fixture(scope="module")
8459
def pgduck_conn(postgres):
8560
conn = psycopg2.connect(

pg_lake_iceberg/tests/pytests/test_iceberg_avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88

99
def test_iceberg_table_with_large_avro_allocs(
10-
pg_conn, superuser_conn, extension, with_default_location, app_user
10+
pg_conn, superuser_conn, iceberg_extension, with_default_location, app_user
1111
):
1212
# 63 characters is the maximum length of a column name in PostgreSQL
1313
table_name = "".join(random.choices(string.ascii_lowercase, k=63))
@@ -149,7 +149,7 @@ def test_iceberg_table_with_large_avro_allocs(
149149

150150

151151
def test_manifest_file_with_dot_field_name(
152-
pg_conn, superuser_conn, s3, extension, create_reserialize_helper_functions
152+
pg_conn, superuser_conn, s3, iceberg_extension, create_reserialize_helper_functions
153153
):
154154
key = "sample_avro/dot-field-name.avro"
155155
local_manifest_path = sample_avro_filepath(f"dot-field-name.avro")

pg_lake_iceberg/tests/pytests/test_iceberg_data_file_stats.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def test_pg_lake_iceberg_table_reserialize_data_file_stats_from_metadata(
164164
def test_pg_lake_iceberg_table_read_data_file_stats_from_catalog(
165165
pg_lake_table_metadata_location,
166166
pg_conn,
167-
extension,
167+
iceberg_extension,
168168
s3,
169169
create_helper_functions,
170170
):
@@ -588,7 +588,7 @@ def test_pg_lake_iceberg_table_special_numerics(
588588

589589
def test_pg_lake_iceberg_table_add_drop_columns(
590590
pg_conn,
591-
extension,
591+
iceberg_extension,
592592
s3,
593593
create_helper_functions,
594594
with_default_location,
@@ -671,7 +671,7 @@ def test_pg_lake_iceberg_table_add_drop_columns(
671671

672672
def test_pg_lake_iceberg_table_uuid_column(
673673
pg_conn,
674-
extension,
674+
iceberg_extension,
675675
s3,
676676
create_helper_functions,
677677
with_default_location,
@@ -704,7 +704,7 @@ def test_pg_lake_iceberg_table_uuid_column(
704704

705705
def test_pg_lake_iceberg_table_bytea_column(
706706
pg_conn,
707-
extension,
707+
iceberg_extension,
708708
s3,
709709
create_helper_functions,
710710
with_default_location,
@@ -738,7 +738,7 @@ def test_pg_lake_iceberg_table_bytea_column(
738738

739739
def test_pg_lake_iceberg_table_serial_column(
740740
pg_conn,
741-
extension,
741+
iceberg_extension,
742742
s3,
743743
create_helper_functions,
744744
with_default_location,
@@ -769,7 +769,7 @@ def test_pg_lake_iceberg_table_serial_column(
769769

770770
def test_pg_lake_iceberg_table_random_values(
771771
pg_conn,
772-
extension,
772+
iceberg_extension,
773773
s3,
774774
create_helper_functions,
775775
with_default_location,
@@ -864,7 +864,7 @@ def test_pg_lake_iceberg_table_random_values(
864864
def test_pg_lake_iceberg_table_complex_values(
865865
superuser_conn,
866866
enable_stats_for_nested_types,
867-
extension,
867+
iceberg_extension,
868868
s3,
869869
create_helper_functions,
870870
with_default_location,
@@ -1166,7 +1166,12 @@ def spark_table_metadata_location(installcheck, spark_session):
11661166

11671167
@pytest.fixture(scope="function")
11681168
def enable_stats_for_nested_types(
1169-
installcheck, extension, app_user, pg_conn, superuser_conn, with_default_location
1169+
installcheck,
1170+
iceberg_extension,
1171+
app_user,
1172+
pg_conn,
1173+
superuser_conn,
1174+
with_default_location,
11701175
):
11711176
run_command_outside_tx(
11721177
[
@@ -1194,7 +1199,7 @@ def enable_stats_for_nested_types(
11941199
@pytest.fixture(scope="function")
11951200
def pg_lake_table_metadata_location(
11961201
installcheck,
1197-
extension,
1202+
iceberg_extension,
11981203
app_user,
11991204
pg_conn,
12001205
superuser_conn,

pg_lake_iceberg/tests/pytests/test_iceberg_metadata_via_spark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def test_reserialize_via_spark(
8585
installcheck,
8686
spark_session,
8787
superuser_conn,
88-
extension,
88+
iceberg_extension,
8989
s3,
9090
spark_table_metadata_location,
9191
create_reserialize_helper_functions,

pg_lake_iceberg/tests/pytests/test_polaris_conn.py

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -252,84 +252,3 @@ def test_http_errors(
252252
assert "Could not resolve host" in error or "Resolving timed out after" in error
253253

254254
pg_conn.rollback()
255-
256-
257-
@pytest.fixture(scope="module")
258-
def create_http_helper_functions(superuser_conn, iceberg_extension):
259-
run_command(
260-
f"""
261-
CREATE TYPE lake_iceberg.http_result AS (
262-
status int,
263-
body text,
264-
resp_headers text
265-
);
266-
267-
CREATE OR REPLACE FUNCTION lake_iceberg.test_http_get(
268-
url text,
269-
headers text[] DEFAULT NULL)
270-
RETURNS lake_iceberg.http_result
271-
AS 'pg_lake_iceberg', 'test_http_get'
272-
LANGUAGE C;
273-
274-
275-
-- HEAD
276-
CREATE OR REPLACE FUNCTION lake_iceberg.test_http_head(
277-
url text,
278-
headers text[] DEFAULT NULL)
279-
RETURNS lake_iceberg.http_result
280-
AS 'pg_lake_iceberg', 'test_http_head'
281-
LANGUAGE C;
282-
283-
-- POST
284-
CREATE OR REPLACE FUNCTION lake_iceberg.test_http_post(
285-
url text,
286-
body text,
287-
headers text[] DEFAULT NULL)
288-
RETURNS lake_iceberg.http_result
289-
AS 'pg_lake_iceberg', 'test_http_post'
290-
LANGUAGE C;
291-
292-
-- PUT
293-
CREATE OR REPLACE FUNCTION lake_iceberg.test_http_put(
294-
url text,
295-
body text,
296-
headers text[] DEFAULT NULL)
297-
RETURNS lake_iceberg.http_result
298-
AS 'pg_lake_iceberg', 'test_http_put'
299-
LANGUAGE C;
300-
301-
-- DELETE
302-
CREATE OR REPLACE FUNCTION lake_iceberg.test_http_delete(
303-
url text,
304-
headers text[] DEFAULT NULL)
305-
RETURNS lake_iceberg.http_result
306-
AS 'pg_lake_iceberg', 'test_http_delete'
307-
LANGUAGE C;
308-
309-
-- URL encode function
310-
CREATE OR REPLACE FUNCTION lake_iceberg.url_encode(input TEXT)
311-
RETURNS text
312-
LANGUAGE C
313-
IMMUTABLE STRICT
314-
AS 'pg_lake_iceberg', $function$url_encode_path$function$;
315-
316-
""",
317-
superuser_conn,
318-
)
319-
superuser_conn.commit()
320-
321-
yield
322-
323-
run_command(
324-
"""
325-
DROP FUNCTION IF EXISTS lake_iceberg.url_encode;
326-
DROP FUNCTION IF EXISTS lake_iceberg.test_http_get;
327-
DROP FUNCTION IF EXISTS lake_iceberg.test_http_head;
328-
DROP FUNCTION IF EXISTS lake_iceberg.test_http_post;
329-
DROP FUNCTION IF EXISTS lake_iceberg.test_http_put;
330-
DROP FUNCTION IF EXISTS lake_iceberg.test_http_delete;
331-
DROP TYPE lake_iceberg.http_result;
332-
""",
333-
superuser_conn,
334-
)
335-
superuser_conn.commit()

pg_lake_table/include/pg_lake/partitioning/partition_spec_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ extern PGDLLEXPORT int GetCurrentSpecId(Oid relationId);
4848
extern int GetLargestPartitionFieldId(Oid relationId);
4949
extern IcebergPartitionSpecField * GetIcebergPartitionFieldFromCatalog(Oid relationId, int fieldId);
5050
extern List *GetIcebergSpecPartitionFieldsFromCatalog(Oid relationId, int specId);
51-
extern List *GetAllIcebergSpecPartitionFieldsFromCatalog(Oid relationId);
51+
extern List *GetAllPartitionSpecFields(Oid relationId);
5252
extern Partition * GetDataFilePartition(Oid relationId, List *partitionTransforms,
5353
const char *path, int32 *partitionSpecId);
5454
extern HTAB *GetAllPartitionSpecsFromCatalog(Oid relationId);

0 commit comments

Comments
 (0)