Skip to content

Commit 87ee257

Browse files
authored
[hive] fix splitting for bucket tables (#6594)
1 parent dedd313 commit 87ee257

File tree

2 files changed

+214
-11
lines changed

2 files changed

+214
-11
lines changed

paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private static Optional<Predicate> createPartitionPredicate(
158158
}
159159
}
160160

161-
private static List<DataSplit> packSplits(
161+
public static List<DataSplit> packSplits(
162162
FileStoreTable table, JobConf jobConf, List<DataSplit> splits, int numSplits) {
163163
if (table.coreOptions().deletionVectorsEnabled()) {
164164
return splits;
@@ -201,8 +201,9 @@ private static List<DataSplit> packSplits(
201201
numFilesAfterPacked += newSplit.dataFiles().size();
202202
dataSplits.add(newSplit);
203203
}
204-
current = split;
205204
bin.clear();
205+
current = split;
206+
bin.addAll(split.dataFiles());
206207
}
207208
}
208209
if (!bin.isEmpty()) {
@@ -235,16 +236,23 @@ private static Long computeSplitSize(
235236
JobConf jobConf, List<DataSplit> splits, int numSplits, long openCostInBytes) {
236237
long maxSize = HiveConf.getLongVar(jobConf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
237238
long minSize = HiveConf.getLongVar(jobConf, HiveConf.ConfVars.MAPREDMINSPLITSIZE);
238-
long totalSize = 0;
239-
for (DataSplit split : splits) {
240-
totalSize +=
241-
split.dataFiles().stream()
242-
.map(f -> Math.max(f.fileSize(), openCostInBytes))
243-
.reduce(Long::sum)
244-
.orElse(0L);
239+
long avgSize;
240+
long splitSize;
241+
if (numSplits > 0) {
242+
long totalSize = 0;
243+
for (DataSplit split : splits) {
244+
totalSize +=
245+
split.dataFiles().stream()
246+
.map(f -> Math.max(f.fileSize(), openCostInBytes))
247+
.reduce(Long::sum)
248+
.orElse(0L);
249+
}
250+
avgSize = totalSize / numSplits;
251+
splitSize = Math.min(maxSize, Math.max(avgSize, minSize));
252+
} else {
253+
avgSize = 0;
254+
splitSize = Math.min(maxSize, minSize);
245255
}
246-
long avgSize = totalSize / numSplits;
247-
long splitSize = Math.min(maxSize, Math.max(avgSize, minSize));
248256
LOG.info(
249257
"Currently, minSplitSize: {}, maxSplitSize: {}, avgSize: {}, finalSplitSize: {}.",
250258
minSize,
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.hive;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.catalog.CatalogContext;
23+
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.data.BinaryRowWriter;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.Path;
27+
import org.apache.paimon.hive.utils.HiveSplitGenerator;
28+
import org.apache.paimon.io.DataFileMeta;
29+
import org.apache.paimon.manifest.FileSource;
30+
import org.apache.paimon.options.Options;
31+
import org.apache.paimon.schema.SchemaManager;
32+
import org.apache.paimon.schema.TableSchema;
33+
import org.apache.paimon.table.AppendOnlyFileStoreTable;
34+
import org.apache.paimon.table.CatalogEnvironment;
35+
import org.apache.paimon.table.FileStoreTable;
36+
import org.apache.paimon.table.source.DataSplit;
37+
import org.apache.paimon.types.DataField;
38+
import org.apache.paimon.types.IntType;
39+
import org.apache.paimon.types.VarCharType;
40+
import org.apache.paimon.utils.TraceableFileIO;
41+
42+
import org.apache.hadoop.hive.conf.HiveConf;
43+
import org.apache.hadoop.mapred.JobConf;
44+
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Test;
46+
import org.junit.jupiter.api.io.TempDir;
47+
48+
import java.util.ArrayList;
49+
import java.util.Arrays;
50+
import java.util.Collections;
51+
import java.util.List;
52+
import java.util.UUID;
53+
54+
import static org.assertj.core.api.Assertions.assertThat;
55+
56+
/** IT cases for {@link HiveSplitGenerator}. */
57+
public class HiveSplitGeneratorTest {
58+
59+
private static final List<DataField> SCHEMA_FIELDS =
60+
Arrays.asList(
61+
new DataField(0, "id", new IntType()),
62+
new DataField(1, "col", VarCharType.STRING_TYPE),
63+
new DataField(2, "pt", VarCharType.STRING_TYPE));
64+
65+
private static final TableSchema TABLE_SCHEMA =
66+
new TableSchema(
67+
0,
68+
SCHEMA_FIELDS,
69+
2,
70+
Collections.emptyList(),
71+
Collections.singletonList("id"),
72+
Collections.emptyMap(),
73+
"");
74+
75+
@TempDir java.nio.file.Path tempDir;
76+
77+
protected Path tablePath;
78+
protected FileIO fileIO;
79+
protected String commitUser;
80+
protected final Options tableConfig = new Options();
81+
82+
@BeforeEach
83+
public void before() throws Exception {
84+
tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
85+
fileIO = FileIO.get(tablePath, CatalogContext.create(new Options()));
86+
commitUser = UUID.randomUUID().toString();
87+
tableConfig.set(CoreOptions.PATH, tablePath.toString());
88+
tableConfig.set(CoreOptions.BUCKET, 1);
89+
}
90+
91+
@Test
92+
public void testPackSplitsForNonBucketTable() throws Exception {
93+
JobConf jobConf = new JobConf();
94+
jobConf.set(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, "268435456"); // 256MB
95+
jobConf.set(HiveConf.ConfVars.MAPREDMINSPLITSIZE.varname, "268435456"); // 256MB
96+
97+
FileStoreTable table = createFileStoreTable(TABLE_SCHEMA);
98+
99+
List<DataSplit> dataSplits = new ArrayList<>();
100+
dataSplits.add(newDataSplit(4, 0, 12582912L)); // 12MB
101+
dataSplits.add(newDataSplit(2, 0, 12582912L));
102+
dataSplits.add(newDataSplit(3, 0, 12582912L));
103+
List<DataSplit> packed = HiveSplitGenerator.packSplits(table, jobConf, dataSplits, 0);
104+
105+
assertThat(packed.size()).isEqualTo(1);
106+
int totalFiles = 0;
107+
for (DataSplit dataSplit : packed) {
108+
totalFiles += dataSplit.dataFiles().size();
109+
}
110+
assertThat(totalFiles).isEqualTo(9);
111+
}
112+
113+
@Test
114+
public void testPackSplitsForBucketTable() throws Exception {
115+
JobConf jobConf = new JobConf();
116+
jobConf.set(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, "268435456");
117+
jobConf.set(HiveConf.ConfVars.MAPREDMINSPLITSIZE.varname, "268435456");
118+
119+
FileStoreTable table = createFileStoreTable(TABLE_SCHEMA);
120+
121+
List<DataSplit> dataSplits = new ArrayList<>();
122+
dataSplits.add(newDataSplit(4, 0, 12582912L));
123+
dataSplits.add(newDataSplit(2, 1, 12582912L));
124+
dataSplits.add(newDataSplit(1, 1, 12582912L));
125+
dataSplits.add(newDataSplit(3, 2, 12582912L));
126+
List<DataSplit> packed = HiveSplitGenerator.packSplits(table, jobConf, dataSplits, 0);
127+
128+
assertThat(packed.size()).isEqualTo(3);
129+
int totalFiles = 0;
130+
for (DataSplit dataSplit : packed) {
131+
totalFiles += dataSplit.dataFiles().size();
132+
}
133+
assertThat(totalFiles).isEqualTo(10);
134+
}
135+
136+
private FileStoreTable createFileStoreTable(TableSchema tableSchema) throws Exception {
137+
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
138+
schemaManager.commit(tableSchema);
139+
140+
return new AppendOnlyFileStoreTable(
141+
fileIO, tablePath, tableSchema, CatalogEnvironment.empty()) {
142+
143+
@Override
144+
public SchemaManager schemaManager() {
145+
return schemaManager;
146+
}
147+
};
148+
}
149+
150+
private DataSplit newDataSplit(int numFiles, int bucket, long fileSize) {
151+
List<DataFileMeta> dataFiles = new ArrayList<>();
152+
153+
for (int i = 0; i < numFiles; i++) {
154+
DataFileMeta fileMeta =
155+
DataFileMeta.create(
156+
"test-file-" + i + ".parquet",
157+
fileSize,
158+
100L,
159+
createBinaryRow(1),
160+
createBinaryRow(100),
161+
null,
162+
null,
163+
0L,
164+
0L,
165+
0,
166+
0,
167+
Collections.emptyList(),
168+
null,
169+
null,
170+
FileSource.APPEND,
171+
null,
172+
null,
173+
null,
174+
null);
175+
dataFiles.add(fileMeta);
176+
}
177+
178+
DataSplit.Builder builder = DataSplit.builder();
179+
builder.withSnapshot(1)
180+
.withPartition(BinaryRow.EMPTY_ROW)
181+
.withBucket(bucket)
182+
.withBucketPath("bucket-" + bucket + "/")
183+
.rawConvertible(true)
184+
.withDataFiles(dataFiles);
185+
return builder.build();
186+
}
187+
188+
private BinaryRow createBinaryRow(int value) {
189+
BinaryRow row = new BinaryRow(1);
190+
BinaryRowWriter writer = new BinaryRowWriter(row);
191+
writer.writeInt(0, value);
192+
writer.complete();
193+
return row;
194+
}
195+
}

0 commit comments

Comments
 (0)