Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.compression;

import java.util.Arrays;

/** Block Compression type. */
public enum BlockCompressionType {
NONE(0),
Expand Down Expand Up @@ -45,4 +47,16 @@ public static BlockCompressionType getCompressionTypeByPersistentId(int persiste

throw new IllegalArgumentException("Unknown persistentId " + persistentId);
}

public static BlockCompressionType getCompressionTypeByValue(String value) {
BlockCompressionType[] types = values();
for (BlockCompressionType type : types) {
if (type.name().equalsIgnoreCase(value)) {
return type;
}
}

throw new IllegalArgumentException(
"Unknown type " + value + ", options are: " + Arrays.toString(types));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ public byte[] serializeToBytes(InternalRow record) {
return rowWriter.copyBuffer();
}

public InternalRow deserialize(byte[] bytes) {
public InternalRow deserialize(MemorySlice memorySlice) {
if (rowReader == null) {
rowReader = new RowReader(calculateBitSetInBytes(getters.length));
}
rowReader.pointTo(bytes);
rowReader.pointTo(memorySlice.segment(), memorySlice.offset());
GenericRow row = new GenericRow(readers.length);
row.setRowKind(rowReader.readRowKind());
for (int i = 0; i < readers.length; i++) {
Expand All @@ -163,6 +163,10 @@ public InternalRow deserialize(byte[] bytes) {
return row;
}

public InternalRow deserialize(byte[] bytes) {
return deserialize(MemorySlice.wrap(bytes));
}

public Comparator<MemorySlice> createSliceComparator() {
return new SliceComparator(rowType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,31 @@ public FormatReaderFactory createReaderFactory(
return createReaderFactory(dataSchemaRowType, projectedRowType, filters);
}

/**
* Same as {@link FileFormat#createReaderFactory(RowType, RowType, List)}, but for file formats
* which need to store keys and values separately.
*/
public FormatReaderFactory createReaderFactory(
RowType dataSchemaRowType,
RowType projectedRowType,
@Nullable List<Predicate> filters,
RowType keyType,
RowType valueType) {
return createReaderFactory(dataSchemaRowType, projectedRowType, filters);
}

/** Create a {@link FormatWriterFactory} from the type. */
public abstract FormatWriterFactory createWriterFactory(RowType type);

/**
* Same as {@link FileFormat#createWriterFactory(RowType)}, but for file formats which need to
* store keys and values separately.
*/
public FormatWriterFactory createWriterFactory(
RowType type, RowType keyType, RowType valueType) {
return createWriterFactory(type);
}

/** Validate data field type supported or not. */
public abstract void validateDataFields(RowType rowType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,8 @@ public MemorySlice readSlice(int length) {
position += length;
return newSlice;
}

public MemorySlice getSlice() {
return slice;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RangeHelper;
Expand Down Expand Up @@ -127,8 +128,14 @@ private boolean filterManifestByRowIds(ManifestFileMeta manifest) {

@Override
public FileStoreScan withReadType(RowType readType) {
if (readType != null && !readType.getFields().isEmpty()) {
this.readType = readType;
if (readType != null) {
List<DataField> nonSystemFields =
readType.getFields().stream()
.filter(f -> !SpecialFields.isSystemField(f.id()))
.collect(Collectors.toList());
if (!nonSystemFields.isEmpty()) {
this.readType = readType;
}
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,34 @@ public void testBasic() throws Exception {
assertThat(r.getString(1).toString()).isEqualTo("a");
assertThat(r.getString(2).toString()).isEqualTo("c");
});

// projection with only special fields.
readBuilder = getTableDefault().newReadBuilder();
reader =
readBuilder
.withReadType(RowType.of(SpecialFields.ROW_ID))
.newRead()
.createReader(readBuilder.newScan().plan());
AtomicInteger cnt = new AtomicInteger(0);
reader.forEachRemaining(
r -> {
cnt.incrementAndGet();
});
assertThat(cnt.get()).isEqualTo(1);

// projection with an empty read type
readBuilder = getTableDefault().newReadBuilder();
reader =
readBuilder
.withReadType(RowType.of())
.newRead()
.createReader(readBuilder.newScan().plan());
AtomicInteger cnt1 = new AtomicInteger(0);
reader.forEachRemaining(
r -> {
cnt1.incrementAndGet();
});
assertThat(cnt1.get()).isEqualTo(1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.sst;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.RoaringBitmap32;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A {@link FileFormat} for SST Files. SST Files are row-oriented and designed to serve frequent
* point queries and range queries by key. The SST File layout is as below: (For layouts of each
* block type, please refer to corresponding classes)
*
* <pre>
* +-----------------------------------+------+
* | Footer | |
* +-----------------------------------+ |
* | File Info | |
* +-----------------------------------+ +--> Loaded on open
* | Bloom Filter Block | |
* +-----------------------------------+ |
* | Index Block | |
* +-----------------------------------+------+
* | Metadata Block | |
* +-----------------------------------+ |
* | Data Block | |
* +-----------------------------------+ +--> Loaded on need
* ...... |
* +-----------------------------------+ |
* | Data Block | |
* +-----------------------------------+------+
* </pre>
*/
public class SstFileFormat extends FileFormat {
private final Options options;
private final MemorySize writeBatchMemory;

public SstFileFormat(FormatContext context) {
super(SstFileFormatFactory.IDENTIFIER);
this.options = context.options();
this.writeBatchMemory = context.writeBatchMemory();
}

@Override
public FormatReaderFactory createReaderFactory(
RowType dataSchemaRowType,
RowType projectedRowType,
@Nullable List<Predicate> filters) {
throw new RuntimeException(
"SST files are row-oriented kv store, please specify key type and value type on creating factories.");
}

@Override
public FormatWriterFactory createWriterFactory(RowType type) {
throw new RuntimeException(
"SST files are row-oriented kv store, please specify key type and value type on creating factories.");
}

@Override
public FormatReaderFactory createReaderFactory(
RowType dataSchemaRowType,
RowType projectedRowType,
@Nullable List<Predicate> filters,
RowType keyType,
RowType valueType) {
return new SstFileFormatReaderFactory(projectedRowType, keyType, valueType);
}

@Override
public FormatWriterFactory createWriterFactory(
RowType rowType, RowType keyType, RowType valueType) {
return new SstFileFormatWriterFactory(
options, rowType, keyType, valueType, writeBatchMemory);
}

@Override
public void validateDataFields(RowType rowType) {
List<DataType> fieldTypes = rowType.getFieldTypes();
for (DataType dataType : fieldTypes) {
validateDataType(dataType);
}
}

private void validateDataType(DataType dataType) {
// SST Files will serialize values into bytes, so that actually all data types are
// supported.
// todo: check key types are comparable
DataTypeRoot typeRoot = dataType.getTypeRoot();
switch (typeRoot) {
case CHAR:
case VARCHAR:
case BOOLEAN:
case BINARY:
case VARBINARY:
case DECIMAL:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case ARRAY:
case MAP:
case ROW:
// All types are supported in SST Files
break;
default:
throw new UnsupportedOperationException(
"Unsupported data type for SST format: " + dataType);
}
}

/** The {@link FormatReaderFactory} for SST Files. */
private static class SstFileFormatReaderFactory implements FormatReaderFactory {
private final RowType projectedRowType;
private final RowType keyType;
private final RowType valueType;

public SstFileFormatReaderFactory(
RowType projectedRowType, RowType keyType, RowType valueType) {
this.projectedRowType = projectedRowType;
this.keyType = keyType;
this.valueType = valueType;
}

@Override
public FileRecordReader<InternalRow> createReader(Context context) throws IOException {
return new SstFormatReader(
context.fileIO(),
context.filePath(),
context.fileSize(),
convertSelection(context.selection()),
projectedRowType,
keyType,
valueType);
}

private List<Integer> convertSelection(RoaringBitmap32 selection) {
if (selection == null) {
return null;
}
List<Integer> result = new ArrayList<>();
selection.iterator().forEachRemaining(result::add);
return result;
}
}

/** The {@link FormatWriterFactory} for SST Files. */
private static class SstFileFormatWriterFactory implements FormatWriterFactory {
private final Options options;
private final RowType dataType;
private final RowType keyType;
private final RowType valueType;
private final MemorySize writeBatchMemory;

public SstFileFormatWriterFactory(
Options options,
RowType dataType,
RowType keyType,
RowType valueType,
MemorySize writeBatchMemory) {
this.options = options;
this.keyType = keyType;
this.valueType = valueType;
this.dataType = dataType;
this.writeBatchMemory = writeBatchMemory;
}

@Override
public FormatWriter create(PositionOutputStream out, String compression)
throws IOException {
BloomFilter.Builder bloomFilter = null;
boolean enableBloomFilter = options.get(SstOptions.BLOOM_FILTER_ENABLED);
if (enableBloomFilter) {
double fpp = options.get(SstOptions.BLOOM_FILTER_FPP);
int estimatedEntryNum = options.get(SstOptions.BLOOM_FILTER_EXPECTED_ENTRY_NUM);
bloomFilter = BloomFilter.builder(estimatedEntryNum, fpp);
}

return new SstFormatWriter(
out,
compression,
writeBatchMemory.getBytes(),
bloomFilter,
dataType,
keyType,
valueType);
}
}
}
Loading
Loading