Skip to content

Commit 7135189

Browse files
author
喆宇
committed
[format] introduce SST file format
1 parent fc46d39 commit 7135189

25 files changed

+2932
-2
lines changed

paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.paimon.compression;
2020

21+
import java.util.Arrays;
22+
2123
/** Block Compression type. */
2224
public enum BlockCompressionType {
2325
NONE(0),
@@ -45,4 +47,16 @@ public static BlockCompressionType getCompressionTypeByPersistentId(int persiste
4547

4648
throw new IllegalArgumentException("Unknown persistentId " + persistentId);
4749
}
50+
51+
public static BlockCompressionType getCompressionTypeByValue(String value) {
52+
BlockCompressionType[] types = values();
53+
for (BlockCompressionType type : types) {
54+
if (type.name().equalsIgnoreCase(value)) {
55+
return type;
56+
}
57+
}
58+
59+
throw new IllegalArgumentException(
60+
"Unknown type " + value + ", options are: " + Arrays.toString(types));
61+
}
4862
}

paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,11 @@ public byte[] serializeToBytes(InternalRow record) {
150150
return rowWriter.copyBuffer();
151151
}
152152

153-
public InternalRow deserialize(byte[] bytes) {
153+
public InternalRow deserialize(MemorySlice memorySlice) {
154154
if (rowReader == null) {
155155
rowReader = new RowReader(calculateBitSetInBytes(getters.length));
156156
}
157-
rowReader.pointTo(bytes);
157+
rowReader.pointTo(memorySlice.segment(), memorySlice.offset());
158158
GenericRow row = new GenericRow(readers.length);
159159
row.setRowKind(rowReader.readRowKind());
160160
for (int i = 0; i < readers.length; i++) {
@@ -163,6 +163,10 @@ public InternalRow deserialize(byte[] bytes) {
163163
return row;
164164
}
165165

166+
public InternalRow deserialize(byte[] bytes) {
167+
return deserialize(MemorySlice.wrap(bytes));
168+
}
169+
166170
public Comparator<MemorySlice> createSliceComparator() {
167171
return new SliceComparator(rowType);
168172
}

paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,31 @@ public FormatReaderFactory createReaderFactory(
7474
return createReaderFactory(dataSchemaRowType, projectedRowType, filters);
7575
}
7676

77+
/**
78+
* Same as {@link FileFormat#createReaderFactory(RowType, RowType, List)}, but for file formats
79+
* which need to store keys and values separately.
80+
*/
81+
public FormatReaderFactory createReaderFactory(
82+
RowType dataSchemaRowType,
83+
RowType projectedRowType,
84+
@Nullable List<Predicate> filters,
85+
RowType keyType,
86+
RowType valueType) {
87+
return createReaderFactory(dataSchemaRowType, projectedRowType, filters);
88+
}
89+
7790
/** Create a {@link FormatWriterFactory} from the type. */
7891
public abstract FormatWriterFactory createWriterFactory(RowType type);
7992

93+
/**
94+
* Same as {@link FileFormat#createWriterFactory(RowType)}, but for file formats which need to
95+
* store keys and values separately.
96+
*/
97+
public FormatWriterFactory createWriterFactory(
98+
RowType type, RowType keyType, RowType valueType) {
99+
return createWriterFactory(type);
100+
}
101+
80102
/** Validate data field type supported or not. */
81103
public abstract void validateDataFields(RowType rowType);
82104

paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,8 @@ public MemorySlice readSlice(int length) {
9999
position += length;
100100
return newSlice;
101101
}
102+
103+
public MemorySlice getSlice() {
104+
return slice;
105+
}
102106
}
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.format.sst;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.format.FileFormat;
23+
import org.apache.paimon.format.FileFormatFactory.FormatContext;
24+
import org.apache.paimon.format.FormatReaderFactory;
25+
import org.apache.paimon.format.FormatWriter;
26+
import org.apache.paimon.format.FormatWriterFactory;
27+
import org.apache.paimon.fs.PositionOutputStream;
28+
import org.apache.paimon.options.MemorySize;
29+
import org.apache.paimon.options.Options;
30+
import org.apache.paimon.predicate.Predicate;
31+
import org.apache.paimon.reader.FileRecordReader;
32+
import org.apache.paimon.types.DataType;
33+
import org.apache.paimon.types.DataTypeRoot;
34+
import org.apache.paimon.types.RowType;
35+
import org.apache.paimon.utils.BloomFilter;
36+
import org.apache.paimon.utils.RoaringBitmap32;
37+
38+
import javax.annotation.Nullable;
39+
40+
import java.io.IOException;
41+
import java.util.ArrayList;
42+
import java.util.List;
43+
44+
/**
45+
* A {@link FileFormat} for SST Files. SST Files are row-oriented and designed to serve frequent
46+
* point queries and range queries by key. The SST File layout is as below: (For layouts of each
47+
* block type, please refer to corresponding classes)
48+
*
49+
* <pre>
50+
* +-----------------------------------+------+
51+
* | Footer | |
52+
* +-----------------------------------+ |
53+
* | File Info | |
54+
* +-----------------------------------+ +--> Loaded on open
55+
* | Bloom Filter Block | |
56+
* +-----------------------------------+ |
57+
* | Index Block | |
58+
* +-----------------------------------+------+
59+
* | Metadata Block | |
60+
* +-----------------------------------+ |
61+
* | Data Block | |
62+
* +-----------------------------------+ +--> Loaded on need
63+
* ...... |
64+
* +-----------------------------------+ |
65+
* | Data Block | |
66+
* +-----------------------------------+------+
67+
* </pre>
68+
*/
69+
public class SstFileFormat extends FileFormat {
70+
private final Options options;
71+
private final MemorySize writeBatchMemory;
72+
73+
public SstFileFormat(FormatContext context) {
74+
super(SstFileFormatFactory.IDENTIFIER);
75+
this.options = context.options();
76+
this.writeBatchMemory = context.writeBatchMemory();
77+
}
78+
79+
@Override
80+
public FormatReaderFactory createReaderFactory(
81+
RowType dataSchemaRowType,
82+
RowType projectedRowType,
83+
@Nullable List<Predicate> filters) {
84+
throw new RuntimeException(
85+
"SST files are row-oriented kv store, please specify key type and value type on creating factories.");
86+
}
87+
88+
@Override
89+
public FormatWriterFactory createWriterFactory(RowType type) {
90+
throw new RuntimeException(
91+
"SST files are row-oriented kv store, please specify key type and value type on creating factories.");
92+
}
93+
94+
@Override
95+
public FormatReaderFactory createReaderFactory(
96+
RowType dataSchemaRowType,
97+
RowType projectedRowType,
98+
@Nullable List<Predicate> filters,
99+
RowType keyType,
100+
RowType valueType) {
101+
return new SstFileFormatReaderFactory(projectedRowType, keyType, valueType);
102+
}
103+
104+
@Override
105+
public FormatWriterFactory createWriterFactory(
106+
RowType rowType, RowType keyType, RowType valueType) {
107+
return new SstFileFormatWriterFactory(
108+
options, rowType, keyType, valueType, writeBatchMemory);
109+
}
110+
111+
@Override
112+
public void validateDataFields(RowType rowType) {
113+
List<DataType> fieldTypes = rowType.getFieldTypes();
114+
for (DataType dataType : fieldTypes) {
115+
validateDataType(dataType);
116+
}
117+
}
118+
119+
private void validateDataType(DataType dataType) {
120+
// SST Files will serialize values into bytes, so that actually all data types are
121+
// supported.
122+
// todo: check key types are comparable
123+
DataTypeRoot typeRoot = dataType.getTypeRoot();
124+
switch (typeRoot) {
125+
case CHAR:
126+
case VARCHAR:
127+
case BOOLEAN:
128+
case BINARY:
129+
case VARBINARY:
130+
case DECIMAL:
131+
case TINYINT:
132+
case SMALLINT:
133+
case INTEGER:
134+
case BIGINT:
135+
case FLOAT:
136+
case DOUBLE:
137+
case DATE:
138+
case TIME_WITHOUT_TIME_ZONE:
139+
case TIMESTAMP_WITHOUT_TIME_ZONE:
140+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
141+
case ARRAY:
142+
case MAP:
143+
case ROW:
144+
// All types are supported in SST Files
145+
break;
146+
default:
147+
throw new UnsupportedOperationException(
148+
"Unsupported data type for SST format: " + dataType);
149+
}
150+
}
151+
152+
/** The {@link FormatReaderFactory} for SST Files. */
153+
private static class SstFileFormatReaderFactory implements FormatReaderFactory {
154+
private final RowType projectedRowType;
155+
private final RowType keyType;
156+
private final RowType valueType;
157+
158+
public SstFileFormatReaderFactory(
159+
RowType projectedRowType, RowType keyType, RowType valueType) {
160+
this.projectedRowType = projectedRowType;
161+
this.keyType = keyType;
162+
this.valueType = valueType;
163+
}
164+
165+
@Override
166+
public FileRecordReader<InternalRow> createReader(Context context) throws IOException {
167+
return new SstFormatReader(
168+
context.fileIO(),
169+
context.filePath(),
170+
context.fileSize(),
171+
convertSelection(context.selection()),
172+
projectedRowType,
173+
keyType,
174+
valueType);
175+
}
176+
177+
private List<Integer> convertSelection(RoaringBitmap32 selection) {
178+
if (selection == null) {
179+
return null;
180+
}
181+
List<Integer> result = new ArrayList<>();
182+
selection.iterator().forEachRemaining(result::add);
183+
return result;
184+
}
185+
}
186+
187+
/** The {@link FormatWriterFactory} for SST Files. */
188+
private static class SstFileFormatWriterFactory implements FormatWriterFactory {
189+
private final Options options;
190+
private final RowType dataType;
191+
private final RowType keyType;
192+
private final RowType valueType;
193+
private final MemorySize writeBatchMemory;
194+
195+
public SstFileFormatWriterFactory(
196+
Options options,
197+
RowType dataType,
198+
RowType keyType,
199+
RowType valueType,
200+
MemorySize writeBatchMemory) {
201+
this.options = options;
202+
this.keyType = keyType;
203+
this.valueType = valueType;
204+
this.dataType = dataType;
205+
this.writeBatchMemory = writeBatchMemory;
206+
}
207+
208+
@Override
209+
public FormatWriter create(PositionOutputStream out, String compression)
210+
throws IOException {
211+
BloomFilter.Builder bloomFilter = null;
212+
boolean enableBloomFilter = options.get(SstOptions.BLOOM_FILTER_ENABLED);
213+
if (enableBloomFilter) {
214+
double fpp = options.get(SstOptions.BLOOM_FILTER_FPP);
215+
int estimatedEntryNum = options.get(SstOptions.BLOOM_FILTER_EXPECTED_ENTRY_NUM);
216+
bloomFilter = BloomFilter.builder(estimatedEntryNum, fpp);
217+
}
218+
219+
return new SstFormatWriter(
220+
out,
221+
compression,
222+
writeBatchMemory.getBytes(),
223+
bloomFilter,
224+
dataType,
225+
keyType,
226+
valueType);
227+
}
228+
}
229+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.format.sst;
20+
21+
import org.apache.paimon.format.FileFormat;
22+
import org.apache.paimon.format.FileFormatFactory;
23+
24+
/** The {@link FileFormatFactory} for SST Files. */
25+
public class SstFileFormatFactory implements FileFormatFactory {
26+
public static final String IDENTIFIER = "sst";
27+
28+
@Override
29+
public String identifier() {
30+
return IDENTIFIER;
31+
}
32+
33+
@Override
34+
public FileFormat create(FormatContext formatContext) {
35+
return new SstFileFormat(formatContext);
36+
}
37+
}

0 commit comments

Comments
 (0)