Skip to content

Commit d1b075a

Browse files
committed
refactor
1 parent 5c229c8 commit d1b075a

12 files changed

+335
-160
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.paimon.data.InternalRow;
2626
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
2727
import org.apache.paimon.flink.sink.Committable;
28-
import org.apache.paimon.flink.sink.WriterRefresher;
28+
import org.apache.paimon.flink.sink.CompactWriterRefresher;
2929
import org.apache.paimon.io.DataFileMeta;
3030
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
3131
import org.apache.paimon.operation.FileStoreWrite.State;
@@ -44,7 +44,6 @@
4444

4545
import java.io.IOException;
4646
import java.util.ArrayList;
47-
import java.util.Collections;
4847
import java.util.LinkedList;
4948
import java.util.List;
5049
import java.util.Queue;
@@ -67,15 +66,14 @@ public class AppendTableCompactor {
6766
@Nullable private final CompactionMetrics compactionMetrics;
6867
@Nullable private final CompactionMetrics.Reporter metricsReporter;
6968

70-
@Nullable protected final WriterRefresher writeRefresher;
69+
@Nullable protected final CompactWriterRefresher compactWriteRefresher;
7170

7271
public AppendTableCompactor(
7372
FileStoreTable table,
7473
String commitUser,
7574
Supplier<ExecutorService> lazyCompactExecutor,
7675
@Nullable MetricGroup metricGroup,
77-
boolean isStreaming,
78-
boolean forDedicatedCompact) {
76+
boolean isStreaming) {
7977
this.table = table;
8078
this.commitUser = commitUser;
8179
CoreOptions coreOptions = table.coreOptions();
@@ -94,8 +92,8 @@ public AppendTableCompactor(
9492
? null
9593
// partition and bucket fields are no use.
9694
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
97-
this.writeRefresher =
98-
WriterRefresher.create(isStreaming, forDedicatedCompact, table, this::replace);
95+
this.compactWriteRefresher =
96+
CompactWriterRefresher.create(isStreaming, table, this::replace);
9997
}
10098

10199
public void processElement(AppendCompactTask task) throws Exception {
@@ -223,23 +221,12 @@ private void replace(FileStoreTable newTable) throws Exception {
223221
this.write.restore(states);
224222
}
225223

226-
public void tryRefreshWrite() {
227-
tryRefreshWrite(false, Collections.emptyList());
228-
}
229-
230-
public void tryRefreshWrite(boolean forCompact, List<DataFileMeta> files) {
224+
public void tryRefreshWrite(List<DataFileMeta> files) {
231225
if (commitUser == null) {
232226
return;
233227
}
234-
235-
if (writeRefresher != null) {
236-
if (forCompact) {
237-
if (!files.isEmpty()) {
238-
writeRefresher.tryRefreshForDataFiles(files);
239-
}
240-
} else {
241-
writeRefresher.tryRefreshForConfigs();
242-
}
228+
if (compactWriteRefresher != null) {
229+
compactWriteRefresher.tryRefresh(files);
243230
}
244231
}
245232
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private AppendBypassCompactWorkerOperator(
3636
FileStoreTable table,
3737
String commitUser,
3838
boolean isStreaming) {
39-
super(parameters, table, commitUser, isStreaming, false);
39+
super(parameters, table, commitUser, isStreaming);
4040
}
4141

4242
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.io.IOException;
36+
import java.util.Collections;
3637
import java.util.List;
3738
import java.util.concurrent.ExecutorService;
3839
import java.util.concurrent.Executors;
@@ -57,19 +58,15 @@ public abstract class AppendCompactWorkerOperator<IN>
5758

5859
private final boolean isStreaming;
5960

60-
private final boolean forDedicatedCompact;
61-
6261
public AppendCompactWorkerOperator(
6362
StreamOperatorParameters<Committable> parameters,
6463
FileStoreTable table,
6564
String commitUser,
66-
boolean isStreaming,
67-
boolean forDedicatedCompact) {
65+
boolean isStreaming) {
6866
super(parameters, Options.fromMap(table.options()));
6967
this.table = table;
7068
this.commitUser = commitUser;
7169
this.isStreaming = isStreaming;
72-
this.forDedicatedCompact = forDedicatedCompact;
7370
}
7471

7572
@VisibleForTesting
@@ -82,20 +79,15 @@ public void open() throws Exception {
8279
LOG.debug("Opened a append-only table compaction worker.");
8380
this.unawareBucketCompactor =
8481
new AppendTableCompactor(
85-
table,
86-
commitUser,
87-
this::workerExecutor,
88-
getMetricGroup(),
89-
isStreaming,
90-
forDedicatedCompact);
82+
table, commitUser, this::workerExecutor, getMetricGroup(), isStreaming);
9183
}
9284

9385
@Override
9486
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
9587
throws IOException {
9688
List<Committable> committables =
9789
this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId);
98-
this.unawareBucketCompactor.tryRefreshWrite();
90+
this.unawareBucketCompactor.tryRefreshWrite(Collections.emptyList());
9991
return committables;
10092
}
10193

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void processElement(StreamRecord<MultiTableAppendCompactTask> element) th
114114
Identifier identifier = element.getValue().tableIdentifier();
115115
AppendTableCompactor compactor =
116116
compactorContainer.computeIfAbsent(identifier, this::compactor);
117-
compactor.tryRefreshWrite(true, element.getValue().compactBefore());
117+
compactor.tryRefreshWrite(element.getValue().compactBefore());
118118
compactor.processElement(element.getValue());
119119
}
120120

@@ -125,8 +125,7 @@ private AppendTableCompactor compactor(Identifier tableId) {
125125
commitUser,
126126
this::workerExecutor,
127127
getMetricGroup(),
128-
isStreaming,
129-
true);
128+
isStreaming);
130129
} catch (Catalog.TableNotExistException e) {
131130
throw new RuntimeException(e);
132131
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ private AppendOnlySingleTableCompactionWorkerOperator(
3939
FileStoreTable table,
4040
String commitUser,
4141
boolean isStreaming) {
42-
super(parameters, table, commitUser, isStreaming, true);
42+
super(parameters, table, commitUser, isStreaming);
4343
}
4444

4545
@Override
4646
public void processElement(StreamRecord<AppendCompactTask> element) throws Exception {
4747
AppendCompactTask task = element.getValue();
48-
this.unawareBucketCompactor.tryRefreshWrite(true, task.compactBefore());
48+
this.unawareBucketCompactor.tryRefreshWrite(task.compactBefore());
4949
this.unawareBucketCompactor.processElement(task);
5050
}
5151

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.io.DataFileMeta;
23+
import org.apache.paimon.schema.TableSchema;
24+
import org.apache.paimon.table.FileStoreTable;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import javax.annotation.Nullable;
30+
31+
import java.util.List;
32+
import java.util.Optional;
33+
34+
import static org.apache.paimon.flink.sink.WriterRefresher.configGroups;
35+
36+
/** Writer refresher for dedicated compaction. */
37+
public class CompactWriterRefresher {
38+
39+
private static final Logger LOG = LoggerFactory.getLogger(CompactWriterRefresher.class);
40+
41+
private FileStoreTable table;
42+
private final WriterRefresher.Refresher refresher;
43+
private final WriterRefresher writerRefresher;
44+
45+
private CompactWriterRefresher(FileStoreTable table, WriterRefresher.Refresher refresher) {
46+
this.table = table;
47+
this.refresher = refresher;
48+
this.writerRefresher = WriterRefresher.create(true, table, refresher);
49+
}
50+
51+
@Nullable
52+
public static CompactWriterRefresher create(
53+
boolean isStreaming, FileStoreTable table, WriterRefresher.Refresher refresher) {
54+
if (!isStreaming) {
55+
return null;
56+
}
57+
return new CompactWriterRefresher(table, refresher);
58+
}
59+
60+
/**
61+
* This is used for dedicated compaction in streaming mode. When the schema-id of newly added
62+
* data files exceeds the current schema-id, the writer needs to be refreshed to prevent data
63+
* loss.
64+
*/
65+
public void tryRefresh(List<DataFileMeta> files) {
66+
if (!files.isEmpty()) {
67+
tryRefreshForDataFiles(files);
68+
} else {
69+
tryRefreshForConfig();
70+
}
71+
}
72+
73+
private void tryRefreshForDataFiles(List<DataFileMeta> files) {
74+
long fileSchemaId =
75+
files.stream().mapToLong(DataFileMeta::schemaId).max().orElse(table.schema().id());
76+
if (fileSchemaId > table.schema().id()) {
77+
Optional<TableSchema> latestSchema = table.schemaManager().latest();
78+
if (!latestSchema.isPresent()) {
79+
return;
80+
}
81+
TableSchema latest = latestSchema.get();
82+
83+
if (latest.id() > table.schema().id()) {
84+
try {
85+
// here we cannot use table.copy(lastestSchema), because table used for
86+
// dedicated compaction has some dynamic options, we should not overwrite them.
87+
// we just copy the lastest fields and options allowed to be refreshed.
88+
table = table.copyWithLatestSchema();
89+
90+
// refresh configs allowed to be updated by the way
91+
if (writerRefresher != null) {
92+
table =
93+
table.copy(
94+
configGroups(
95+
writerRefresher.configGroups(),
96+
CoreOptions.fromMap(latest.options())));
97+
writerRefresher.updateTable(table);
98+
}
99+
100+
refresher.refresh(table);
101+
LOG.info(
102+
"write has been refreshed due to schema in data files changed. new schema id:{}.",
103+
table.schema().id());
104+
} catch (Exception e) {
105+
throw new RuntimeException("update write failed.", e);
106+
}
107+
}
108+
}
109+
}
110+
111+
private void tryRefreshForConfig() {
112+
if (writerRefresher != null) {
113+
writerRefresher.tryRefresh();
114+
}
115+
}
116+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ
7171
private transient DataFileMetaSerializer dataFileMetaSerializer;
7272
private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
7373

74-
protected transient @Nullable WriterRefresher writeRefresher;
74+
protected transient @Nullable CompactWriterRefresher compactWriterRefresher;
7575

7676
public StoreCompactOperator(
7777
StreamOperatorParameters<Committable> parameters,
@@ -120,8 +120,8 @@ public void initializeState(StateInitializationContext context) throws Exception
120120
getContainingTask().getEnvironment().getIOManager(),
121121
memoryPoolFactory,
122122
getMetricGroup());
123-
this.writeRefresher =
124-
WriterRefresher.create(write.streamingMode(), true, table, write::replace);
123+
this.compactWriterRefresher =
124+
CompactWriterRefresher.create(write.streamingMode(), table, write::replace);
125125
}
126126

127127
@Override
@@ -152,7 +152,7 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
152152

153153
if (write.streamingMode()) {
154154
write.notifyNewFiles(snapshotId, partition, bucket, files);
155-
tryRefreshWrite(true, files);
155+
tryRefreshWrite(files);
156156
} else {
157157
Preconditions.checkArgument(
158158
files.isEmpty(),
@@ -178,7 +178,7 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin
178178

179179
List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
180180

181-
tryRefreshWrite();
181+
tryRefreshWrite(Collections.emptyList());
182182
return committables;
183183
}
184184

@@ -200,19 +200,9 @@ public Set<Pair<BinaryRow, Integer>> compactionWaitingSet() {
200200
return waitToCompact;
201201
}
202202

203-
private void tryRefreshWrite() {
204-
tryRefreshWrite(false, Collections.emptyList());
205-
}
206-
207-
private void tryRefreshWrite(boolean forCompact, List<DataFileMeta> files) {
208-
if (writeRefresher != null) {
209-
if (forCompact) {
210-
if (!files.isEmpty()) {
211-
writeRefresher.tryRefreshForDataFiles(files);
212-
}
213-
} else {
214-
writeRefresher.tryRefreshForConfigs();
215-
}
203+
private void tryRefreshWrite(List<DataFileMeta> files) {
204+
if (compactWriterRefresher != null) {
205+
compactWriterRefresher.tryRefresh(files);
216206
}
217207
}
218208

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public StoreSinkWrite getWrite() {
158158

159159
protected void tryRefreshWrite() {
160160
if (writeRefresher != null) {
161-
writeRefresher.tryRefreshForConfigs();
161+
writeRefresher.tryRefresh();
162162
}
163163
}
164164

0 commit comments

Comments
 (0)