Skip to content

Commit f185fd8

Browse files
committed
[flink] support detecting fields changing for dedicated compaction in streaming mode
1 parent 87bafe8 commit f185fd8

File tree

12 files changed

+300
-32
lines changed

12 files changed

+300
-32
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2727
import org.apache.paimon.flink.sink.Committable;
2828
import org.apache.paimon.flink.sink.WriterRefresher;
29+
import org.apache.paimon.io.DataFileMeta;
2930
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
3031
import org.apache.paimon.operation.FileStoreWrite.State;
3132
import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -43,6 +44,7 @@
4344

4445
import java.io.IOException;
4546
import java.util.ArrayList;
47+
import java.util.Collections;
4648
import java.util.LinkedList;
4749
import java.util.List;
4850
import java.util.Queue;
@@ -72,7 +74,8 @@ public AppendTableCompactor(
7274
String commitUser,
7375
Supplier<ExecutorService> lazyCompactExecutor,
7476
@Nullable MetricGroup metricGroup,
75-
boolean isStreaming) {
77+
boolean isStreaming,
78+
boolean forDedicatedCompact) {
7679
this.table = table;
7780
this.commitUser = commitUser;
7881
CoreOptions coreOptions = table.coreOptions();
@@ -91,7 +94,8 @@ public AppendTableCompactor(
9194
? null
9295
// partition and bucket fields are no use.
9396
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
94-
this.writeRefresher = WriterRefresher.create(isStreaming, table, this::replace);
97+
this.writeRefresher =
98+
WriterRefresher.create(isStreaming, forDedicatedCompact, table, this::replace);
9599
}
96100

97101
public void processElement(AppendCompactTask task) throws Exception {
@@ -220,11 +224,22 @@ private void replace(FileStoreTable newTable) throws Exception {
220224
}
221225

222226
public void tryRefreshWrite() {
227+
tryRefreshWrite(false, Collections.emptyList());
228+
}
229+
230+
public void tryRefreshWrite(boolean forCompact, List<DataFileMeta> files) {
223231
if (commitUser == null) {
224232
return;
225233
}
234+
226235
if (writeRefresher != null) {
227-
writeRefresher.tryRefresh();
236+
if (forCompact) {
237+
if (!files.isEmpty()) {
238+
writeRefresher.tryRefreshForDataFiles(files);
239+
}
240+
} else {
241+
writeRefresher.tryRefreshForConfigs();
242+
}
228243
}
229244
}
230245
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private AppendBypassCompactWorkerOperator(
3636
FileStoreTable table,
3737
String commitUser,
3838
boolean isStreaming) {
39-
super(parameters, table, commitUser, isStreaming);
39+
super(parameters, table, commitUser, isStreaming, false);
4040
}
4141

4242
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,19 @@ public abstract class AppendCompactWorkerOperator<IN>
5757

5858
private final boolean isStreaming;
5959

60+
private final boolean forDedicatedCompact;
61+
6062
public AppendCompactWorkerOperator(
6163
StreamOperatorParameters<Committable> parameters,
6264
FileStoreTable table,
6365
String commitUser,
64-
boolean isStreaming) {
66+
boolean isStreaming,
67+
boolean forDedicatedCompact) {
6568
super(parameters, Options.fromMap(table.options()));
6669
this.table = table;
6770
this.commitUser = commitUser;
6871
this.isStreaming = isStreaming;
72+
this.forDedicatedCompact = forDedicatedCompact;
6973
}
7074

7175
@VisibleForTesting
@@ -78,7 +82,12 @@ public void open() throws Exception {
7882
LOG.debug("Opened a append-only table compaction worker.");
7983
this.unawareBucketCompactor =
8084
new AppendTableCompactor(
81-
table, commitUser, this::workerExecutor, getMetricGroup(), isStreaming);
85+
table,
86+
commitUser,
87+
this::workerExecutor,
88+
getMetricGroup(),
89+
isStreaming,
90+
forDedicatedCompact);
8291
}
8392

8493
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,10 @@ protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long
112112
@Override
113113
public void processElement(StreamRecord<MultiTableAppendCompactTask> element) throws Exception {
114114
Identifier identifier = element.getValue().tableIdentifier();
115-
compactorContainer
116-
.computeIfAbsent(identifier, this::compactor)
117-
.processElement(element.getValue());
115+
AppendTableCompactor compactor =
116+
compactorContainer.computeIfAbsent(identifier, this::compactor);
117+
compactor.tryRefreshWrite(true, element.getValue().compactBefore());
118+
compactor.processElement(element.getValue());
118119
}
119120

120121
private AppendTableCompactor compactor(Identifier tableId) {
@@ -124,7 +125,8 @@ private AppendTableCompactor compactor(Identifier tableId) {
124125
commitUser,
125126
this::workerExecutor,
126127
getMetricGroup(),
127-
isStreaming);
128+
isStreaming,
129+
true);
128130
} catch (Catalog.TableNotExistException e) {
129131
throw new RuntimeException(e);
130132
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ private AppendOnlySingleTableCompactionWorkerOperator(
3939
FileStoreTable table,
4040
String commitUser,
4141
boolean isStreaming) {
42-
super(parameters, table, commitUser, isStreaming);
42+
super(parameters, table, commitUser, isStreaming, true);
4343
}
4444

4545
@Override
4646
public void processElement(StreamRecord<AppendCompactTask> element) throws Exception {
47-
this.unawareBucketCompactor.processElement(element.getValue());
47+
AppendCompactTask task = element.getValue();
48+
this.unawareBucketCompactor.tryRefreshWrite(true, task.compactBefore());
49+
this.unawareBucketCompactor.processElement(task);
4850
}
4951

5052
/** {@link StreamOperatorFactory} of {@link AppendOnlySingleTableCompactionWorkerOperator}. */

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import javax.annotation.Nullable;
4444

4545
import java.io.IOException;
46+
import java.util.Collections;
4647
import java.util.LinkedHashSet;
4748
import java.util.List;
4849
import java.util.Set;
@@ -119,7 +120,8 @@ public void initializeState(StateInitializationContext context) throws Exception
119120
getContainingTask().getEnvironment().getIOManager(),
120121
memoryPoolFactory,
121122
getMetricGroup());
122-
this.writeRefresher = WriterRefresher.create(write.streamingMode(), table, write::replace);
123+
this.writeRefresher =
124+
WriterRefresher.create(write.streamingMode(), true, table, write::replace);
123125
}
124126

125127
@Override
@@ -150,6 +152,7 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
150152

151153
if (write.streamingMode()) {
152154
write.notifyNewFiles(snapshotId, partition, bucket, files);
155+
tryRefreshWrite(true, files);
153156
} else {
154157
Preconditions.checkArgument(
155158
files.isEmpty(),
@@ -198,8 +201,18 @@ public Set<Pair<BinaryRow, Integer>> compactionWaitingSet() {
198201
}
199202

200203
private void tryRefreshWrite() {
204+
tryRefreshWrite(false, Collections.emptyList());
205+
}
206+
207+
private void tryRefreshWrite(boolean forCompact, List<DataFileMeta> files) {
201208
if (writeRefresher != null) {
202-
writeRefresher.tryRefresh();
209+
if (forCompact) {
210+
if (!files.isEmpty()) {
211+
writeRefresher.tryRefreshForDataFiles(files);
212+
}
213+
} else {
214+
writeRefresher.tryRefreshForConfigs();
215+
}
203216
}
204217
}
205218

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public StoreSinkWrite getWrite() {
158158

159159
protected void tryRefreshWrite() {
160160
if (writeRefresher != null) {
161-
writeRefresher.tryRefresh();
161+
writeRefresher.tryRefreshForConfigs();
162162
}
163163
}
164164

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.flink.FlinkConnectorOptions;
23+
import org.apache.paimon.io.DataFileMeta;
2324
import org.apache.paimon.options.Options;
2425
import org.apache.paimon.schema.TableSchema;
2526
import org.apache.paimon.table.FileStoreTable;
@@ -31,6 +32,7 @@
3132

3233
import java.util.Arrays;
3334
import java.util.HashMap;
35+
import java.util.List;
3436
import java.util.Map;
3537
import java.util.Objects;
3638
import java.util.Optional;
@@ -49,9 +51,11 @@ public class WriterRefresher {
4951

5052
private FileStoreTable table;
5153
private final Refresher refresher;
52-
private final Set<String> configGroups;
5354

54-
private WriterRefresher(FileStoreTable table, Refresher refresher, Set<String> configGroups) {
55+
@Nullable private final Set<String> configGroups;
56+
57+
private WriterRefresher(
58+
FileStoreTable table, Refresher refresher, @Nullable Set<String> configGroups) {
5559
this.table = table;
5660
this.refresher = refresher;
5761
this.configGroups = configGroups;
@@ -60,6 +64,15 @@ private WriterRefresher(FileStoreTable table, Refresher refresher, Set<String> c
6064
@Nullable
6165
public static WriterRefresher create(
6266
boolean isStreaming, FileStoreTable table, Refresher refresher) {
67+
return create(isStreaming, false, table, refresher);
68+
}
69+
70+
@Nullable
71+
public static WriterRefresher create(
72+
boolean isStreaming,
73+
boolean needForCompact,
74+
FileStoreTable table,
75+
Refresher refresher) {
6376
if (!isStreaming) {
6477
return null;
6578
}
@@ -71,13 +84,21 @@ public static WriterRefresher create(
7184
isNullOrWhitespaceOnly(refreshDetectors)
7285
? null
7386
: Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
74-
if (configGroups == null || configGroups.isEmpty()) {
87+
if (!needForCompact && (configGroups == null || configGroups.isEmpty())) {
7588
return null;
7689
}
7790
return new WriterRefresher(table, refresher, configGroups);
7891
}
7992

80-
public void tryRefresh() {
93+
/**
94+
* Try to refresh write when configs which are expected to be refreshed in streaming mode
95+
* changed.
96+
*/
97+
public void tryRefreshForConfigs() {
98+
if (configGroups == null || configGroups.isEmpty()) {
99+
return;
100+
}
101+
81102
Optional<TableSchema> latestSchema = table.schemaManager().latest();
82103
if (!latestSchema.isPresent()) {
83104
return;
@@ -92,24 +113,51 @@ public void tryRefresh() {
92113
configGroups(configGroups, CoreOptions.fromMap(latest.options()));
93114

94115
if (!Objects.equals(newOptions, currentOptions)) {
95-
if (LOG.isDebugEnabled()) {
96-
LOG.debug(
97-
"table schema has changed, current schema-id:{}, try to update write with new schema-id:{}. "
98-
+ "current options:{}, new options:{}.",
99-
table.schema().id(),
100-
latestSchema.get().id(),
101-
currentOptions,
102-
newOptions);
103-
}
104116
table = table.copy(newOptions);
105117
refresher.refresh(table);
118+
LOG.info(
119+
"write has been refreshed due to configs changed. old options:{}, new options:{}.",
120+
currentOptions,
121+
newOptions);
106122
}
107123
} catch (Exception e) {
108124
throw new RuntimeException("update write failed.", e);
109125
}
110126
}
111127
}
112128

129+
/**
130+
* This is used for dedicated compaction in streaming mode. When the schema-id of newly added
131+
* data files exceeds the current schema-id, the writer needs to be refreshed to prevent data
132+
* loss.
133+
*/
134+
public void tryRefreshForDataFiles(List<DataFileMeta> files) {
135+
long fileSchemaId =
136+
files.stream().mapToLong(DataFileMeta::schemaId).max().orElse(table.schema().id());
137+
if (fileSchemaId > table.schema().id()) {
138+
Optional<TableSchema> latestSchema = table.schemaManager().latest();
139+
if (!latestSchema.isPresent()) {
140+
return;
141+
}
142+
TableSchema latest = latestSchema.get();
143+
144+
if (latest.id() > table.schema().id()) {
145+
try {
146+
// here we cannot use table.copy(lastestSchema), because table used for
147+
// dedicated compaction has some dynamic options,we should not overwrite them.
148+
// we just need copy the lastest fields.
149+
table = table.copyWithLatestSchema();
150+
refresher.refresh(table);
151+
LOG.info(
152+
"write has been refreshed due to schema in data files changed. new schema id:{}.",
153+
table.schema().id());
154+
} catch (Exception e) {
155+
throw new RuntimeException("update write failed.", e);
156+
}
157+
}
158+
}
159+
}
160+
113161
/** Refresher when configs changed. */
114162
public interface Refresher {
115163
void refresh(FileStoreTable table) throws Exception;

0 commit comments

Comments
 (0)