Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,19 @@ public class LookupFile {

private final File localFile;
private final int level;
private final long schemaId;
private final LookupStoreReader reader;
private final Runnable callback;

private long requestCount;
private long hitCount;
private boolean isClosed = false;

public LookupFile(File localFile, int level, LookupStoreReader reader, Runnable callback) {
public LookupFile(
File localFile, int level, long schemaId, LookupStoreReader reader, Runnable callback) {
this.localFile = localFile;
this.level = level;
this.schemaId = schemaId;
this.reader = reader;
this.callback = callback;
}
Expand All @@ -67,6 +70,10 @@ public File localFile() {
return localFile;
}

public long schemaId() {
return schemaId;
}

@Nullable
public byte[] get(byte[] key) throws IOException {
checkArgument(!isClosed);
Expand Down
253 changes: 56 additions & 197 deletions paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.mergetree.lookup.LookupSerializerFactory;
import org.apache.paimon.mergetree.lookup.PersistProcessor;
import org.apache.paimon.mergetree.lookup.RemoteFileDownloader;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.FileIOUtils;
Expand All @@ -41,51 +42,57 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
import static org.apache.paimon.utils.VarLengthIntUtils.decodeLong;
import static org.apache.paimon.utils.VarLengthIntUtils.encodeLong;

/** Provide lookup by key. */
public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {

public static final String CURRENT_VERSION = "v1";
public static final String REMOTE_LOOKUP_FILE_SUFFIX = ".lookup";

private final Function<Long, RowType> schemaFunction;
private final long currentSchemaId;
private final Levels levels;
private final Comparator<InternalRow> keyComparator;
private final RowCompactedSerializer keySerializer;
private final PersistProcessor<T> persistProcessor;
private final PersistProcessor.Factory<T> processorFactory;
private final LookupSerializerFactory serializerFactory;
private final IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory;
private final Function<String, File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final Cache<String, LookupFile> lookupFileCache;
private final Set<String> ownCachedFiles;
private final String remoteSstSuffix;
private final Map<Long, PersistProcessor<T>> schemaIdToProcessors;

@Nullable private RemoteFileDownloader remoteFileDownloader;

public LookupLevels(
Function<Long, RowType> schemaFunction,
long currentSchemaId,
Levels levels,
Comparator<InternalRow> keyComparator,
RowType keyType,
PersistProcessor<T> persistProcessor,
PersistProcessor.Factory<T> processorFactory,
LookupSerializerFactory serializerFactory,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
this.schemaFunction = schemaFunction;
this.currentSchemaId = currentSchemaId;
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
this.persistProcessor = persistProcessor;
this.processorFactory = processorFactory;
this.serializerFactory = serializerFactory;
this.fileReaderFactory = fileReaderFactory;
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
Expand All @@ -94,10 +101,11 @@ public LookupLevels(
this.ownCachedFiles = new HashSet<>();
this.remoteSstSuffix =
"."
+ persistProcessor.identifier()
+ processorFactory.identifier()
+ "."
+ CURRENT_VERSION
+ serializerFactory.identifier()
+ REMOTE_LOOKUP_FILE_SUFFIX;
this.schemaIdToProcessors = new ConcurrentHashMap<>();
levels.addDropFileCallback(this);
}

Expand Down Expand Up @@ -162,7 +170,18 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
return null;
}

return persistProcessor.readFromDisk(key, lookupFile.level(), valueBytes, file.fileName());
return getOrCreateProcessor(lookupFile.schemaId())
.readFromDisk(key, lookupFile.level(), valueBytes, file.fileName());
}

private PersistProcessor<T> getOrCreateProcessor(long schemaId) {
return schemaIdToProcessors.computeIfAbsent(
schemaId,
id -> {
RowType fileSchema =
schemaId == currentSchemaId ? null : schemaFunction.apply(schemaId);
return processorFactory.create(serializerFactory, fileSchema);
});
}

public LookupFile createLookupFile(DataFileMeta file) throws IOException {
Expand All @@ -171,18 +190,36 @@ public LookupFile createLookupFile(DataFileMeta file) throws IOException {
throw new IOException("Can not create new file: " + localFile);
}

if (remoteFileDownloader == null || !remoteFileDownloader.tryToDownload(file, localFile)) {
long schemaId = this.currentSchemaId;
if (tryToDownloadRemoteSst(file, localFile)) {
// use schema id from remote file
schemaId = file.schemaId();
} else {
createSstFileFromDataFile(file, localFile);
}

ownCachedFiles.add(file.fileName());
return new LookupFile(
localFile,
file.level(),
schemaId,
lookupStoreFactory.createReader(localFile),
() -> ownCachedFiles.remove(file.fileName()));
}

private boolean tryToDownloadRemoteSst(DataFileMeta file, File localFile) {
if (remoteFileDownloader == null) {
return false;
}
// validate schema matched, no exception here
try {
getOrCreateProcessor(file.schemaId());
} catch (UnsupportedOperationException e) {
return false;
}
return remoteFileDownloader.tryToDownload(file, localFile);
}

public void addLocalFile(DataFileMeta file, LookupFile lookupFile) {
lookupFileCache.put(file.fileName(), lookupFile);
}
Expand All @@ -192,14 +229,14 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws
lookupStoreFactory.createWriter(
localFile, bfGenerator.apply(file.rowCount()));
RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
PersistProcessor<T> processor = getOrCreateProcessor(currentSchemaId);
KeyValue kv;
if (persistProcessor.withPosition()) {
if (processor.withPosition()) {
FileRecordIterator<KeyValue> batch;
while ((batch = (FileRecordIterator<KeyValue>) reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
byte[] valueBytes =
persistProcessor.persistToDisk(kv, batch.returnedPosition());
byte[] valueBytes = processor.persistToDisk(kv, batch.returnedPosition());
kvWriter.put(keyBytes, valueBytes);
}
batch.releaseBatch();
Expand All @@ -209,7 +246,7 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws
while ((batch = reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
byte[] valueBytes = persistProcessor.persistToDisk(kv);
byte[] valueBytes = processor.persistToDisk(kv);
kvWriter.put(keyBytes, valueBytes);
}
batch.releaseBatch();
Expand All @@ -232,182 +269,4 @@ public void close() throws IOException {
lookupFileCache.invalidate(cachedFile);
}
}

/** Processor to process value. */
public interface PersistProcessor<T> {

String identifier();

boolean withPosition();

byte[] persistToDisk(KeyValue kv);

default byte[] persistToDisk(KeyValue kv, long rowPosition) {
throw new UnsupportedOperationException();
}

T readFromDisk(InternalRow key, int level, byte[] valueBytes, String fileName);
}

/** A {@link PersistProcessor} to return {@link KeyValue}. */
public static class PersistValueProcessor implements PersistProcessor<KeyValue> {

private final RowCompactedSerializer valueSerializer;

public PersistValueProcessor(RowType valueType) {
this.valueSerializer = new RowCompactedSerializer(valueType);
}

@Override
public String identifier() {
return "value";
}

@Override
public boolean withPosition() {
return false;
}

@Override
public byte[] persistToDisk(KeyValue kv) {
byte[] vBytes = valueSerializer.serializeToBytes(kv.value());
byte[] bytes = new byte[vBytes.length + 8 + 1];
MemorySegment segment = MemorySegment.wrap(bytes);
segment.put(0, vBytes);
segment.putLong(bytes.length - 9, kv.sequenceNumber());
segment.put(bytes.length - 1, kv.valueKind().toByteValue());
return bytes;
}

@Override
public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes, String fileName) {
InternalRow value = valueSerializer.deserialize(bytes);
long sequenceNumber = MemorySegment.wrap(bytes).getLong(bytes.length - 9);
RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]);
return new KeyValue().replace(key, sequenceNumber, rowKind, value).setLevel(level);
}
}

/** A {@link PersistProcessor} to return {@link Boolean} only. */
public static class PersistEmptyProcessor implements PersistProcessor<Boolean> {

private static final byte[] EMPTY_BYTES = new byte[0];

@Override
public String identifier() {
return "empty";
}

@Override
public boolean withPosition() {
return false;
}

@Override
public byte[] persistToDisk(KeyValue kv) {
return EMPTY_BYTES;
}

@Override
public Boolean readFromDisk(InternalRow key, int level, byte[] bytes, String fileName) {
return Boolean.TRUE;
}
}

/** A {@link PersistProcessor} to return {@link PositionedKeyValue}. */
public static class PersistPositionProcessor implements PersistProcessor<PositionedKeyValue> {

private final boolean persistValue;
private final RowCompactedSerializer valueSerializer;

public PersistPositionProcessor(RowType valueType, boolean persistValue) {
this.persistValue = persistValue;
this.valueSerializer = persistValue ? new RowCompactedSerializer(valueType) : null;
}

@Override
public String identifier() {
return persistValue ? "position-and-value" : "position";
}

@Override
public boolean withPosition() {
return true;
}

@Override
public byte[] persistToDisk(KeyValue kv) {
throw new UnsupportedOperationException();
}

@Override
public byte[] persistToDisk(KeyValue kv, long rowPosition) {
if (persistValue) {
byte[] vBytes = valueSerializer.serializeToBytes(kv.value());
byte[] bytes = new byte[vBytes.length + 8 + 8 + 1];
MemorySegment segment = MemorySegment.wrap(bytes);
segment.put(0, vBytes);
segment.putLong(bytes.length - 17, rowPosition);
segment.putLong(bytes.length - 9, kv.sequenceNumber());
segment.put(bytes.length - 1, kv.valueKind().toByteValue());
return bytes;
} else {
byte[] bytes = new byte[MAX_VAR_LONG_SIZE];
int len = encodeLong(bytes, rowPosition);
return Arrays.copyOf(bytes, len);
}
}

@Override
public PositionedKeyValue readFromDisk(
InternalRow key, int level, byte[] bytes, String fileName) {
if (persistValue) {
InternalRow value = valueSerializer.deserialize(bytes);
MemorySegment segment = MemorySegment.wrap(bytes);
long rowPosition = segment.getLong(bytes.length - 17);
long sequenceNumber = segment.getLong(bytes.length - 9);
RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]);
return new PositionedKeyValue(
new KeyValue().replace(key, sequenceNumber, rowKind, value).setLevel(level),
fileName,
rowPosition);
} else {
long rowPosition = decodeLong(bytes, 0);
return new PositionedKeyValue(null, fileName, rowPosition);
}
}
}

/** {@link KeyValue} with file name and row position for DeletionVector. */
public static class PositionedKeyValue {

private final @Nullable KeyValue keyValue;
private final String fileName;
private final long rowPosition;

public PositionedKeyValue(@Nullable KeyValue keyValue, String fileName, long rowPosition) {
this.keyValue = keyValue;
this.fileName = fileName;
this.rowPosition = rowPosition;
}

public String fileName() {
return fileName;
}

public long rowPosition() {
return rowPosition;
}

@Nullable
public KeyValue keyValue() {
return keyValue;
}
}

/** Downloader to try to download remote lookup file to local. */
public interface RemoteFileDownloader {

boolean tryToDownload(DataFileMeta dataFile, File localFile);
}
}
Loading
Loading