Skip to content

Commit 5c229c8

Browse files
committed
fix
1 parent f185fd8 commit 5c229c8

File tree

1 file changed

+18
-13
lines changed

1 file changed

+18
-13
lines changed

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@
3131
import org.apache.paimon.table.FileStoreTable;
3232
import org.apache.paimon.table.sink.BatchTableCommit;
3333
import org.apache.paimon.table.sink.BatchTableWrite;
34+
import org.apache.paimon.types.DataField;
3435
import org.apache.paimon.types.DataTypes;
3536

3637
import org.junit.jupiter.api.BeforeEach;
3738
import org.junit.jupiter.api.Test;
3839
import org.junit.jupiter.api.io.TempDir;
3940

41+
import java.util.ArrayList;
4042
import java.util.Arrays;
4143
import java.util.Collections;
4244
import java.util.HashMap;
45+
import java.util.List;
4346
import java.util.Map;
4447
import java.util.Set;
4548
import java.util.stream.Collectors;
@@ -214,35 +217,26 @@ public void testRefreshWithNeedCompact() throws Exception {
214217
public void testRefreshForDataFiles() throws Exception {
215218
Map<String, String> options = new HashMap<>();
216219
createTable(options);
217-
218220
FileStoreTable table1 = getTable();
219221

220-
table1.schemaManager()
221-
.commitChanges(
222-
SchemaChange.setOption(
223-
CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "round-robin"),
224-
SchemaChange.addColumn("c", DataTypes.INT()));
222+
table1.schemaManager().commitChanges(SchemaChange.addColumn("c", DataTypes.INT()));
225223
FileStoreTable table2 = getTable();
226224
try (BatchTableWrite write = table2.newBatchWriteBuilder().newWrite();
227225
BatchTableCommit commit = table2.newBatchWriteBuilder().newCommit()) {
228226
write.write(GenericRow.of(1, 1, 1));
229227
commit.commit(write.prepareCommit());
230228
}
231229

232-
Map<String, String> refreshedOptions = new HashMap<>();
230+
List<DataField> dataFields = new ArrayList<>();
233231
WriterRefresher writerRefresher =
234232
WriterRefresher.create(
235233
true,
236234
true,
237235
table1,
238-
new TestWriteRefresher(
239-
Collections.singleton("external-paths"), refreshedOptions));
236+
new TestWriteRefresher(null, Collections.emptyMap(), dataFields));
240237
writerRefresher.tryRefreshForDataFiles(
241238
table2.newSnapshotReader().read().dataSplits().get(0).dataFiles());
242-
assertThat(refreshedOptions)
243-
.isEqualTo(
244-
configGroups(
245-
Collections.singleton("external-paths"), table2.coreOptions()));
239+
assertThat(dataFields).isEqualTo(table2.schema().fields());
246240
}
247241

248242
private void createTable(Map<String, String> options) throws Exception {
@@ -264,10 +258,17 @@ private static class TestWriteRefresher implements WriterRefresher.Refresher {
264258

265259
private final Set<String> groups;
266260
private final Map<String, String> options;
261+
private final List<DataField> dataFields;
267262

268263
TestWriteRefresher(Set<String> groups, Map<String, String> options) {
264+
this(groups, options, null);
265+
}
266+
267+
TestWriteRefresher(
268+
Set<String> groups, Map<String, String> options, List<DataField> fields) {
269269
this.groups = groups;
270270
this.options = options;
271+
this.dataFields = fields;
271272
}
272273

273274
@Override
@@ -276,6 +277,10 @@ public void refresh(FileStoreTable table) {
276277
if (groups != null) {
277278
options.putAll(configGroups(groups, table.coreOptions()));
278279
}
280+
if (dataFields != null) {
281+
dataFields.clear();
282+
dataFields.addAll(table.schema().fields());
283+
}
279284
}
280285
}
281286
}

0 commit comments

Comments
 (0)