Skip to content

Commit c7d9ee0

Browse files
Use GetLatestSnapshot() over GetActiveSnapshot()
We already do serialization properly via `LockTableForUpdate` on modify vs vacuum, that part is fine. The problem is that when the waiting transaction is unblocked, it cannot really see the catalog changes done by the others, unless the snapshot is `GetLatestSnapshot()` And, in VACUUM & all other places, we already do that properly: https://github.com/Snowflake-Labs/pg_lake/blob/bfa38cef35eea16aade99f2557fab7c923a9f9c9/pg_lake_table/src/fdw/writable_table.c#L692-L696 So, we should do that for table scan as well, and this PR is about that. Essentially, we get to Iceberg catalog operations follow a global order defined by `LockTableForUpdate()` within that order, each 'catalog op' sees all committed changes from earlier `catalog ops`. We also use the same snapshot for all tables in a scan.
1 parent bfa38ce commit c7d9ee0

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

pg_lake_table/src/fdw/snapshot.c

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include "utils/typcache.h"
5353

5454
static PgLakeTableScan * CreateTableScanForRelation(Oid relationId,
55+
Snapshot snapshot,
5556
int uniqueRelationIdentifier,
5657
List *baseRestrictInfoList,
5758
bool includeChildren,
@@ -81,6 +82,13 @@ CreatePgLakeScanSnapshot(List *rteList,
8182
bool includeChildren,
8283
Oid resultRelationId)
8384
{
85+
/*
86+
* Make sure: a) We see the concurrent changes that might have happened on
87+
* the data_files catalog b) We have a consistent view of multiple tables
88+
* if the query involves multiple tables.
89+
*/
90+
Snapshot postgresSnapshot = GetLatestSnapshot();
91+
8492
List *tableScans = NIL;
8593

8694
ListCell *relationCell = NULL;
@@ -116,7 +124,8 @@ CreatePgLakeScanSnapshot(List *rteList,
116124
}
117125

118126
PgLakeTableScan *tableScan =
119-
CreateTableScanForRelation(relationId, uniqueRelationIdentifier,
127+
CreateTableScanForRelation(relationId, postgresSnapshot,
128+
uniqueRelationIdentifier,
120129
baseRestrictInfoList,
121130
includeChildren && rte->inh,
122131
isResultRelation);
@@ -174,7 +183,7 @@ GetBaseRestrictInfoForRelation(List *relationRestrictionsList, int uniqueRelatio
174183
* CreateTableScanForRelation creates a table scan for the given relation.
175184
*/
176185
static PgLakeTableScan *
177-
CreateTableScanForRelation(Oid relationId, int uniqueRelationIdentifier, List *baseRestrictInfoList,
186+
CreateTableScanForRelation(Oid relationId, Snapshot snapshot, int uniqueRelationIdentifier, List *baseRestrictInfoList,
178187
bool includeChildren, bool isResultRelation)
179188
{
180189
List *fileScans = NIL;
@@ -187,7 +196,6 @@ CreateTableScanForRelation(Oid relationId, int uniqueRelationIdentifier, List *b
187196
* We'll calculate the deletion files based on the pruned data files
188197
* using the same snapshot.
189198
*/
190-
Snapshot snapshot = GetActiveSnapshot();
191199
bool dataOnly = true;
192200
bool newFilesOnly = false;
193201
List *dataFiles =
@@ -317,7 +325,8 @@ CreateTableScanForRelation(Oid relationId, int uniqueRelationIdentifier, List *b
317325
* hierarchy?
318326
*/
319327
PgLakeTableScan *childScan =
320-
CreateTableScanForRelation(childId, childRelationIdentifier,
328+
CreateTableScanForRelation(childId, snapshot,
329+
childRelationIdentifier,
321330
baseRestrictInfoList,
322331
includeChildren,
323332
isResultRelation);

0 commit comments

Comments
 (0)