Skip to content

Commit 7a1187e

Browse files
authored
[core] Append commit should check bucket number if latest commit user is different (#6723)
1 parent 321af6f commit 7a1187e

File tree

8 files changed

+110
-4
lines changed

8 files changed

+110
-4
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface FileStoreCommit extends AutoCloseable {
3636

3737
FileStoreCommit withPartitionExpire(PartitionExpire partitionExpire);
3838

39+
FileStoreCommit appendCommitCheckConflict(boolean appendCommitCheckConflict);
40+
3941
/** Find out which committables need to be retried when recovering from the failure. */
4042
List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables);
4143

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
154154

155155
private boolean ignoreEmptyCommit;
156156
private CommitMetrics commitMetrics;
157+
private boolean appendCommitCheckConflict = false;
157158

158159
public FileStoreCommitImpl(
159160
SnapshotCommit snapshotCommit,
@@ -246,6 +247,12 @@ public FileStoreCommit withPartitionExpire(PartitionExpire partitionExpire) {
246247
return this;
247248
}
248249

250+
@Override
251+
public FileStoreCommit appendCommitCheckConflict(boolean appendCommitCheckConflict) {
252+
this.appendCommitCheckConflict = appendCommitCheckConflict;
253+
return this;
254+
}
255+
249256
@Override
250257
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables) {
251258
// nothing to filter, fast exit
@@ -327,6 +334,8 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
327334
if (containsFileDeletionOrDeletionVectors(appendSimpleEntries, appendIndexFiles)) {
328335
commitKind = CommitKind.OVERWRITE;
329336
conflictCheck = mustConflictCheck();
337+
} else if (latestSnapshot != null && appendCommitCheckConflict) {
338+
conflictCheck = mustConflictCheck();
330339
}
331340

332341
boolean discardDuplicate = discardDuplicateFiles && commitKind == CommitKind.APPEND;

paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class BatchWriteBuilderImpl implements BatchWriteBuilder {
4040
private final String commitUser;
4141

4242
private Map<String, String> staticPartition;
43+
private boolean appendCommitCheckConflict = false;
4344

4445
public BatchWriteBuilderImpl(InnerTable table) {
4546
this.table = table;
@@ -81,15 +82,23 @@ public BatchTableWrite newWrite() {
8182

8283
@Override
8384
public BatchTableCommit newCommit() {
84-
InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);
85+
InnerTableCommit commit =
86+
table.newCommit(commitUser)
87+
.withOverwrite(staticPartition)
88+
.appendCommitCheckConflict(appendCommitCheckConflict);
8589
commit.ignoreEmptyCommit(
8690
Options.fromMap(table.options())
8791
.getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT)
8892
.orElse(true));
8993
return commit;
9094
}
9195

92-
public BatchWriteBuilder copyWithNewTable(Table newTable) {
96+
public BatchWriteBuilderImpl copyWithNewTable(Table newTable) {
9397
return new BatchWriteBuilderImpl((InnerTable) newTable, commitUser, staticPartition);
9498
}
99+
100+
public BatchWriteBuilderImpl appendCommitCheckConflict(boolean appendCommitCheckConflict) {
101+
this.appendCommitCheckConflict = appendCommitCheckConflict;
102+
return this;
103+
}
95104
}

paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit {
4646

4747
InnerTableCommit expireForEmptyCommit(boolean expireForEmptyCommit);
4848

49+
InnerTableCommit appendCommitCheckConflict(boolean appendCommitCheckConflict);
50+
4951
@Override
5052
InnerTableCommit withMetricRegistry(MetricRegistry registry);
5153
}

paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ public TableCommitImpl expireForEmptyCommit(boolean expireForEmptyCommit) {
158158
return this;
159159
}
160160

161+
@Override
162+
public TableCommitImpl appendCommitCheckConflict(boolean appendCommitCheckConflict) {
163+
commit.appendCommitCheckConflict(appendCommitCheckConflict);
164+
return this;
165+
}
166+
161167
@Override
162168
public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
163169
commit.withMetrics(new CommitMetrics(registry, tableName));

paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050

5151
import org.junit.jupiter.api.Test;
5252
import org.junit.jupiter.api.io.TempDir;
53+
import org.junit.jupiter.params.ParameterizedTest;
54+
import org.junit.jupiter.params.provider.ValueSource;
5355

5456
import java.util.Arrays;
5557
import java.util.Collections;
@@ -67,6 +69,7 @@
6769
import static java.util.Collections.singletonMap;
6870
import static org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
6971
import static org.assertj.core.api.Assertions.assertThat;
72+
import static org.assertj.core.api.Assertions.assertThatCode;
7073
import static org.assertj.core.api.Assertions.assertThatThrownBy;
7174

7275
/** Tests for {@link TableCommit}. */
@@ -322,6 +325,71 @@ public void testGiveUpCommitWhenTotalBucketsChanged() throws Exception {
322325
}
323326
}
324327

328+
@ParameterizedTest
329+
@ValueSource(booleans = {true, false})
330+
public void testGiveUpCommitWhenAppendFoundTotalBucketsChanged(boolean checkAppend)
331+
throws Exception {
332+
String path = tempDir.toString();
333+
RowType rowType =
334+
RowType.of(
335+
new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
336+
new String[] {"k", "v"});
337+
338+
Options options = new Options();
339+
options.set(CoreOptions.PATH, path);
340+
options.set(CoreOptions.BUCKET, 1);
341+
TableSchema tableSchema =
342+
SchemaUtils.forceCommit(
343+
new SchemaManager(LocalFileIO.create(), new Path(path)),
344+
new Schema(
345+
rowType.getFields(),
346+
Collections.emptyList(),
347+
Collections.singletonList("k"),
348+
options.toMap(),
349+
""));
350+
FileStoreTable table =
351+
FileStoreTableFactory.create(
352+
LocalFileIO.create(),
353+
new Path(path),
354+
tableSchema,
355+
CatalogEnvironment.empty());
356+
357+
String commitUser1 = UUID.randomUUID().toString();
358+
TableWriteImpl<?> write1 = table.newWrite(commitUser1);
359+
TableCommitImpl commit1 = table.newCommit(commitUser1);
360+
for (int i = 1; i < 10; i++) {
361+
write1.write(GenericRow.of(i, (long) i));
362+
}
363+
364+
// mock rescale
365+
String commitUser2 = UUID.randomUUID().toString();
366+
options = new Options(table.options());
367+
options.set(CoreOptions.BUCKET, 2);
368+
FileStoreTable rescaleTable = table.copy(tableSchema.copy(options.toMap()));
369+
try (TableWriteImpl<?> write = rescaleTable.newWrite(commitUser2);
370+
TableCommitImpl commit =
371+
rescaleTable.newCommit(commitUser2).withOverwrite(Collections.emptyMap())) {
372+
for (int i = 1; i < 10; i++) {
373+
write.write(GenericRow.of(i, (long) i));
374+
}
375+
commit.commit(1, write.prepareCommit(false, 1));
376+
}
377+
378+
if (checkAppend) {
379+
commit1.appendCommitCheckConflict(true);
380+
assertThatThrownBy(() -> commit1.commit(1, write1.prepareCommit(false, 1)))
381+
.isInstanceOf(RuntimeException.class)
382+
.hasMessageContaining("changed from 2 to 1 without overwrite");
383+
} else {
384+
// the commit result is error, but here verify that no check if
385+
// appendCommitCheckConflict was not set
386+
assertThatCode(() -> commit1.commit(1, write1.prepareCommit(false, 1)))
387+
.doesNotThrowAnyException();
388+
}
389+
write1.close();
390+
commit1.close();
391+
}
392+
325393
@Test
326394
public void testStrictModeForCompact() throws Exception {
327395
String path = tempDir.toString();

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,16 @@ protected CommittableStateManager<ManifestCommittable> createCommittableStateMan
6868
@Override
6969
protected Committer.Factory<Committable, ManifestCommittable> createCommitterFactory() {
7070
if (overwritePartition == null) {
71-
// The table has copied bucket option outside, no need to change anything
72-
return super.createCommitterFactory();
71+
// The table has copied bucket option outside, no need to change.
72+
return context ->
73+
new StoreCommitter(
74+
table,
75+
table.newCommit(context.commitUser())
76+
.withOverwrite(overwritePartition)
77+
.ignoreEmptyCommit(!context.streamingCheckpointEnabled())
78+
// Need to check conflict
79+
.appendCommitCheckConflict(true),
80+
context);
7381
} else {
7482
// When overwriting, the postpone bucket files need to be deleted, so using a postpone
7583
// bucket table commit here

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ case class PaimonSparkWriter(
410410
writeBuilder
411411
.asInstanceOf[BatchWriteBuilderImpl]
412412
.copyWithNewTable(PostponeUtils.tableForCommit(table))
413+
// Need to check conflict
414+
.appendCommitCheckConflict(true)
413415
} else {
414416
writeBuilder
415417
}

0 commit comments

Comments
 (0)