Skip to content

Commit 049e7cb

Browse files
Simplify TableAwareFileStoreSourceSplit and filter table in metadata accessor
1 parent 4a73154 commit 049e7cb

File tree

10 files changed

+134
-103
lines changed

10 files changed

+134
-103
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/CDCMetadataAccessor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ public List<TableId> listTables(@Nullable String namespace, @Nullable String sch
7878
for (String databaseName : databaseNames) {
7979
try {
8080
for (String tableName : catalog.listTables(databaseName)) {
81+
Identifier identifier = Identifier.create(databaseName, tableName);
82+
try {
83+
Table table = catalog.getTable(identifier);
84+
if (!(table instanceof FileStoreTable)) {
85+
LOG.info(
86+
"listTables found table {}, but it is a {} instead of a FileStoreTable. Skipping this table.",
87+
identifier,
88+
table.getClass().getSimpleName());
89+
continue;
90+
}
91+
} catch (Catalog.TableNotExistException e) {
92+
LOG.warn(
93+
"Table {} does not exist. Perhaps it is dropped in the middle of this method. Skipping this table.",
94+
identifier,
95+
e);
96+
continue;
97+
}
98+
8199
tableIds.add(TableId.tableId(databaseName, tableName));
82100
}
83101
} catch (Catalog.DatabaseNotExistException e) {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/CDCSource.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,23 @@
4141
import org.apache.flink.api.java.tuple.Tuple2;
4242
import org.apache.flink.cdc.common.configuration.Configuration;
4343
import org.apache.flink.cdc.common.event.Event;
44+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
45+
import org.apache.flink.cdc.common.event.TableId;
4446
import org.apache.flink.configuration.CoreOptions;
4547
import org.apache.flink.core.io.SimpleVersionedSerializer;
4648
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
4749

4850
import javax.annotation.Nullable;
4951

52+
import java.util.Collections;
5053
import java.util.HashMap;
54+
import java.util.List;
5155
import java.util.Map;
5256

57+
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getSchemaDifference;
5358
import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
5459
import static org.apache.paimon.flink.pipeline.cdc.util.CDCUtils.createCatalog;
60+
import static org.apache.paimon.flink.pipeline.cdc.util.PaimonToFlinkCDCTypeConverter.convertPaimonSchemaToFlinkCDCSchema;
5561

5662
/** The {@link Source} that integrates with Flink CDC framework. */
5763
public class CDCSource implements Source<Event, TableAwareFileStoreSourceSplit, CDCCheckpoint> {
@@ -118,25 +124,43 @@ public SourceReader<Event, TableAwareFileStoreSourceSplit> createReader(
118124
new FileStoreSourceReaderMetrics(metricGroup);
119125

120126
Catalog catalog = createCatalog(catalogContext, flinkConfig);
121-
TableReadManager tableReadManager = new TableReadManager(catalog, ioManager, metricGroup);
122-
return new CDCSourceReader(context, sourceReaderMetrics, ioManager, tableReadManager);
127+
TableManager manager = new TableManager(catalog, ioManager, metricGroup);
128+
return new CDCSourceReader(context, sourceReaderMetrics, ioManager, manager);
123129
}
124130

125-
/** A manager for {@link TableRead}s. */
126-
public static class TableReadManager {
131+
/** A manager for information related to the tables. */
132+
public static class TableManager {
127133
private final Map<Identifier, FileStoreTable> tableMap = new HashMap<>();
134+
private final Map<Tuple2<Identifier, Long>, TableSchema> tableSchemaMap = new HashMap<>();
128135
private final Map<Tuple2<Identifier, Long>, TableRead> tableReadMap = new HashMap<>();
129136
private final Catalog catalog;
130137
private final IOManager ioManager;
131138
private final SourceReaderMetricGroup metricGroup;
132139

133-
protected TableReadManager(
140+
protected TableManager(
134141
Catalog catalog, IOManager ioManager, SourceReaderMetricGroup metricGroup) {
135142
this.catalog = catalog;
136143
this.ioManager = ioManager;
137144
this.metricGroup = metricGroup;
138145
}
139146

147+
public @Nullable TableSchema getTableSchema(
148+
Identifier identifier, @Nullable Long schemaId) {
149+
if (schemaId == null) {
150+
return null;
151+
}
152+
153+
Tuple2<Identifier, Long> cacheKey = Tuple2.of(identifier, schemaId);
154+
if (tableSchemaMap.containsKey(cacheKey)) {
155+
return tableSchemaMap.get(cacheKey);
156+
}
157+
158+
FileStoreTable table = getTable(identifier);
159+
TableSchema tableSchema = table.schemaManager().schema(schemaId);
160+
tableSchemaMap.put(cacheKey, tableSchema);
161+
return tableSchema;
162+
}
163+
140164
public TableRead getTableRead(Identifier identifier, TableSchema schema) {
141165
Tuple2<Identifier, Long> cacheKey = Tuple2.of(identifier, schema.id());
142166
if (tableReadMap.containsKey(cacheKey)) {
@@ -153,6 +177,18 @@ public TableRead getTableRead(Identifier identifier, TableSchema schema) {
153177
return tableRead;
154178
}
155179

180+
public List<SchemaChangeEvent> generateSchemaChangeEventList(
181+
Identifier identifier, @Nullable Long lastSchemaId, long schemaId) {
182+
if (lastSchemaId != null && lastSchemaId.equals(schemaId)) {
183+
return Collections.emptyList();
184+
}
185+
186+
return getSchemaDifference(
187+
TableId.tableId(identifier.getDatabaseName(), identifier.getTableName()),
188+
convertPaimonSchemaToFlinkCDCSchema(getTableSchema(identifier, lastSchemaId)),
189+
convertPaimonSchemaToFlinkCDCSchema(getTableSchema(identifier, schemaId)));
190+
}
191+
156192
private FileStoreTable getTable(Identifier identifier) {
157193
if (tableMap.containsKey(identifier)) {
158194
return tableMap.get(identifier);

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.flink.source.FileStoreSourceSplit;
23-
import org.apache.paimon.schema.TableSchema;
2423
import org.apache.paimon.table.source.Split;
2524
import org.apache.paimon.utils.InstantiationUtil;
2625
import org.apache.paimon.utils.JsonSerdeUtil;
@@ -42,32 +41,32 @@
4241
*/
4342
public class TableAwareFileStoreSourceSplit extends FileStoreSourceSplit {
4443
private final Identifier identifier;
45-
@Nullable private final TableSchema lastSchema;
46-
private final TableSchema schema;
44+
private final @Nullable Long lastSchemaId;
45+
private final long schemaId;
4746

4847
public TableAwareFileStoreSourceSplit(
4948
String id,
5049
Split split,
5150
long recordsToSkip,
5251
Identifier identifier,
53-
@Nullable TableSchema lastSchema,
54-
TableSchema schema) {
52+
@Nullable Long lastSchemaId,
53+
long schemaId) {
5554
super(id, split, recordsToSkip);
5655
this.identifier = identifier;
57-
this.lastSchema = lastSchema;
58-
this.schema = schema;
56+
this.lastSchemaId = lastSchemaId;
57+
this.schemaId = schemaId;
5958
}
6059

6160
public Identifier getIdentifier() {
6261
return identifier;
6362
}
6463

65-
public @Nullable TableSchema getLastSchema() {
66-
return lastSchema;
64+
public @Nullable Long getLastSchemaId() {
65+
return lastSchemaId;
6766
}
6867

69-
public TableSchema getSchema() {
70-
return schema;
68+
public long getSchemaId() {
69+
return schemaId;
7170
}
7271

7372
@Override
@@ -81,13 +80,14 @@ public boolean equals(Object o) {
8180
&& Objects.equals(split(), other.split())
8281
&& recordsToSkip() == other.recordsToSkip()
8382
&& identifier.equals(other.identifier)
84-
&& Objects.equals(lastSchema, other.lastSchema)
85-
&& schema.equals(other.schema);
83+
&& Objects.equals(lastSchemaId, other.lastSchemaId)
84+
&& schemaId == other.schemaId;
8685
}
8786

8887
@Override
8988
public int hashCode() {
90-
return Objects.hash(splitId(), split(), recordsToSkip(), identifier, lastSchema, schema);
89+
return Objects.hash(
90+
splitId(), split(), recordsToSkip(), identifier, lastSchemaId, schemaId);
9191
}
9292

9393
@Override
@@ -102,16 +102,17 @@ public String toString() {
102102
+ recordsToSkip()
103103
+ ", identifier="
104104
+ identifier
105-
+ ", lastSchema="
106-
+ lastSchema
107-
+ ", schema="
108-
+ schema
105+
+ ", lastSchemaId="
106+
+ lastSchemaId
107+
+ ", schemaId="
108+
+ schemaId
109109
+ '}';
110110
}
111111

112112
/** The serializer for {@link TableAwareFileStoreSourceSplit}. */
113113
public static class Serializer
114114
implements SimpleVersionedSerializer<TableAwareFileStoreSourceSplit> {
115+
private static final Long NULL_SCHEMA_ID = -1L;
115116

116117
@Override
117118
public int getVersion() {
@@ -126,8 +127,9 @@ public byte[] serialize(TableAwareFileStoreSourceSplit split) throws IOException
126127
InstantiationUtil.serializeObject(view, split.split());
127128
view.writeLong(split.recordsToSkip());
128129
view.writeUTF(JsonSerdeUtil.toJson(split.getIdentifier()));
129-
view.writeUTF(JsonSerdeUtil.toJson(split.getLastSchema()));
130-
view.writeUTF(JsonSerdeUtil.toJson(split.getSchema()));
130+
view.writeLong(
131+
split.getLastSchemaId() == null ? NULL_SCHEMA_ID : split.getLastSchemaId());
132+
view.writeLong(split.getSchemaId());
131133
return out.toByteArray();
132134
}
133135

@@ -145,10 +147,13 @@ public TableAwareFileStoreSourceSplit deserialize(int version, byte[] serialized
145147
}
146148
long recordsToSkip = view.readLong();
147149
Identifier identifier = JsonSerdeUtil.fromJson(view.readUTF(), Identifier.class);
148-
TableSchema lastSchema = TableSchema.fromJson(view.readUTF());
149-
TableSchema schema = TableSchema.fromJson(view.readUTF());
150+
Long lastSchemaId = view.readLong();
151+
if (lastSchemaId.equals(NULL_SCHEMA_ID)) {
152+
lastSchemaId = null;
153+
}
154+
long schemaId = view.readLong();
150155
return new TableAwareFileStoreSourceSplit(
151-
splitId, split, recordsToSkip, identifier, lastSchema, schema);
156+
splitId, split, recordsToSkip, identifier, lastSchemaId, schemaId);
152157
}
153158
}
154159
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/enumerator/CDCSourceEnumerator.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.paimon.catalog.Identifier;
2626
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2727
import org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
28-
import org.apache.paimon.schema.TableSchema;
2928
import org.apache.paimon.table.FileStoreTable;
3029
import org.apache.paimon.table.Table;
3130
import org.apache.paimon.table.source.DataSplit;
@@ -272,16 +271,16 @@ private void processDiscoveredSplits(Optional<TableAwarePlan> planOptional, Thro
272271
FileStoreTable table = tableStatusMap.get(tableAwarePlan.identifier).table;
273272
List<TableAwareFileStoreSourceSplit> splits = new ArrayList<>();
274273
for (Split split : plan.splits()) {
275-
TableSchema lastSchema = tableStatusMap.get(tableAwarePlan.identifier).schema;
274+
Long lastSchemaId = tableStatusMap.get(tableAwarePlan.identifier).schemaId;
276275
TableAwareFileStoreSourceSplit tableAwareFileStoreSourceSplit =
277276
toTableAwareSplit(
278277
splitIdGenerator.getNextId(),
279278
split,
280279
table,
281280
tableAwarePlan.identifier,
282-
lastSchema);
283-
tableStatusMap.get(tableAwarePlan.identifier).schema =
284-
tableAwareFileStoreSourceSplit.getSchema();
281+
lastSchemaId);
282+
tableStatusMap.get(tableAwarePlan.identifier).schemaId =
283+
tableAwareFileStoreSourceSplit.getSchemaId();
285284
splits.add(tableAwareFileStoreSourceSplit);
286285
}
287286

@@ -295,13 +294,12 @@ protected TableAwareFileStoreSourceSplit toTableAwareSplit(
295294
Split split,
296295
FileStoreTable table,
297296
Identifier identifier,
298-
@Nullable TableSchema lastSchema) {
297+
@Nullable Long lastSchemaId) {
299298
Preconditions.checkState(split instanceof DataSplit);
300299
long snapshotId = ((DataSplit) split).snapshotId();
301300
long schemaId = table.snapshot(snapshotId).schemaId();
302-
TableSchema schema = table.schemaManager().schema(schemaId);
303301
return new TableAwareFileStoreSourceSplit(
304-
splitId, split, 0, identifier, lastSchema, schema);
302+
splitId, split, 0, identifier, lastSchemaId, schemaId);
305303
}
306304

307305
/**
@@ -411,7 +409,7 @@ private TableAwarePlan(TableScan.Plan plan, Identifier identifier, Long nextSnap
411409
private static class TableStatus {
412410
private final FileStoreTable table;
413411
private final StreamDataTableScan scan;
414-
private TableSchema schema;
412+
private Long schemaId;
415413
private Integer subtaskId;
416414
private Long nextSnapshotId;
417415

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ public CDCSourceReader(
4949
SourceReaderContext readerContext,
5050
FileStoreSourceReaderMetrics metrics,
5151
IOManager ioManager,
52-
CDCSource.TableReadManager tableReadManager) {
52+
CDCSource.TableManager tableManager) {
5353
super(
54-
() -> new CDCSourceSplitReader(metrics, tableReadManager),
54+
() -> new CDCSourceSplitReader(metrics, tableManager),
5555
(element, output, state) ->
5656
CDCRecordsWithSplitIds.emitRecord(
5757
readerContext, element, output, state, metrics),

0 commit comments

Comments
 (0)