2525import org .apache .paimon .io .DataFileMeta ;
2626import org .apache .paimon .lookup .LookupStoreFactory ;
2727import org .apache .paimon .lookup .LookupStoreWriter ;
28- import org .apache .paimon .memory .MemorySegment ;
28+ import org .apache .paimon .mergetree .lookup .LookupSerializerFactory ;
29+ import org .apache .paimon .mergetree .lookup .PersistProcessor ;
30+ import org .apache .paimon .mergetree .lookup .RemoteFileDownloader ;
2931import org .apache .paimon .reader .FileRecordIterator ;
3032import org .apache .paimon .reader .RecordReader ;
31- import org .apache .paimon .types .RowKind ;
3233import org .apache .paimon .types .RowType ;
3334import org .apache .paimon .utils .BloomFilter ;
3435import org .apache .paimon .utils .FileIOUtils ;
4142import java .io .Closeable ;
4243import java .io .File ;
4344import java .io .IOException ;
44- import java .util .Arrays ;
4545import java .util .Comparator ;
4646import java .util .HashSet ;
47+ import java .util .Map ;
4748import java .util .Set ;
4849import java .util .TreeSet ;
50+ import java .util .concurrent .ConcurrentHashMap ;
4951import java .util .function .Function ;
5052
51- import static org .apache .paimon .utils .VarLengthIntUtils .MAX_VAR_LONG_SIZE ;
52- import static org .apache .paimon .utils .VarLengthIntUtils .decodeLong ;
53- import static org .apache .paimon .utils .VarLengthIntUtils .encodeLong ;
54-
5553/** Provide lookup by key. */
5654public class LookupLevels <T > implements Levels .DropFileCallback , Closeable {
5755
58- public static final String CURRENT_VERSION = "v1" ;
5956 public static final String REMOTE_LOOKUP_FILE_SUFFIX = ".lookup" ;
6057
58+ private final Function <Long , RowType > schemaFunction ;
59+ private final long currentSchemaId ;
6160 private final Levels levels ;
6261 private final Comparator <InternalRow > keyComparator ;
6362 private final RowCompactedSerializer keySerializer ;
64- private final PersistProcessor <T > persistProcessor ;
63+ private final PersistProcessor .Factory <T > processorFactory ;
64+ private final LookupSerializerFactory serializerFactory ;
6565 private final IOFunction <DataFileMeta , RecordReader <KeyValue >> fileReaderFactory ;
6666 private final Function <String , File > localFileFactory ;
6767 private final LookupStoreFactory lookupStoreFactory ;
6868 private final Function <Long , BloomFilter .Builder > bfGenerator ;
6969 private final Cache <String , LookupFile > lookupFileCache ;
7070 private final Set <String > ownCachedFiles ;
7171 private final String remoteSstSuffix ;
72+ private final Map <Long , PersistProcessor <T >> schemaIdToProcessors ;
7273
7374 @ Nullable private RemoteFileDownloader remoteFileDownloader ;
7475
7576 public LookupLevels (
77+ Function <Long , RowType > schemaFunction ,
78+ long currentSchemaId ,
7679 Levels levels ,
7780 Comparator <InternalRow > keyComparator ,
7881 RowType keyType ,
79- PersistProcessor <T > persistProcessor ,
82+ PersistProcessor .Factory <T > processorFactory ,
83+ LookupSerializerFactory serializerFactory ,
8084 IOFunction <DataFileMeta , RecordReader <KeyValue >> fileReaderFactory ,
8185 Function <String , File > localFileFactory ,
8286 LookupStoreFactory lookupStoreFactory ,
8387 Function <Long , BloomFilter .Builder > bfGenerator ,
8488 Cache <String , LookupFile > lookupFileCache ) {
89+ this .schemaFunction = schemaFunction ;
90+ this .currentSchemaId = currentSchemaId ;
8591 this .levels = levels ;
8692 this .keyComparator = keyComparator ;
8793 this .keySerializer = new RowCompactedSerializer (keyType );
88- this .persistProcessor = persistProcessor ;
94+ this .processorFactory = processorFactory ;
95+ this .serializerFactory = serializerFactory ;
8996 this .fileReaderFactory = fileReaderFactory ;
9097 this .localFileFactory = localFileFactory ;
9198 this .lookupStoreFactory = lookupStoreFactory ;
@@ -94,10 +101,11 @@ public LookupLevels(
94101 this .ownCachedFiles = new HashSet <>();
95102 this .remoteSstSuffix =
96103 "."
97- + persistProcessor .identifier ()
104+ + processorFactory .identifier ()
98105 + "."
99- + CURRENT_VERSION
106+ + serializerFactory . identifier ()
100107 + REMOTE_LOOKUP_FILE_SUFFIX ;
108+ this .schemaIdToProcessors = new ConcurrentHashMap <>();
101109 levels .addDropFileCallback (this );
102110 }
103111
@@ -162,7 +170,18 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
162170 return null ;
163171 }
164172
165- return persistProcessor .readFromDisk (key , lookupFile .level (), valueBytes , file .fileName ());
173+ return getOrCreateProcessor (lookupFile .schemaId ())
174+ .readFromDisk (key , lookupFile .level (), valueBytes , file .fileName ());
175+ }
176+
177+ private PersistProcessor <T > getOrCreateProcessor (long schemaId ) {
178+ return schemaIdToProcessors .computeIfAbsent (
179+ schemaId ,
180+ id -> {
181+ RowType fileSchema =
182+ schemaId == currentSchemaId ? null : schemaFunction .apply (schemaId );
183+ return processorFactory .create (serializerFactory , fileSchema );
184+ });
166185 }
167186
168187 public LookupFile createLookupFile (DataFileMeta file ) throws IOException {
@@ -171,18 +190,36 @@ public LookupFile createLookupFile(DataFileMeta file) throws IOException {
171190 throw new IOException ("Can not create new file: " + localFile );
172191 }
173192
174- if (remoteFileDownloader == null || !remoteFileDownloader .tryToDownload (file , localFile )) {
193+ long schemaId = this .currentSchemaId ;
194+ if (tryToDownloadRemoteSst (file , localFile )) {
195+ // use schema id from remote file
196+ schemaId = file .schemaId ();
197+ } else {
175198 createSstFileFromDataFile (file , localFile );
176199 }
177200
178201 ownCachedFiles .add (file .fileName ());
179202 return new LookupFile (
180203 localFile ,
181204 file .level (),
205+ schemaId ,
182206 lookupStoreFactory .createReader (localFile ),
183207 () -> ownCachedFiles .remove (file .fileName ()));
184208 }
185209
210+ private boolean tryToDownloadRemoteSst (DataFileMeta file , File localFile ) {
211+ if (remoteFileDownloader == null ) {
212+ return false ;
213+ }
214+ // validate schema matched, no exception here
215+ try {
216+ getOrCreateProcessor (file .schemaId ());
217+ } catch (UnsupportedOperationException e ) {
218+ return false ;
219+ }
220+ return remoteFileDownloader .tryToDownload (file , localFile );
221+ }
222+
186223 public void addLocalFile (DataFileMeta file , LookupFile lookupFile ) {
187224 lookupFileCache .put (file .fileName (), lookupFile );
188225 }
@@ -192,14 +229,14 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws
192229 lookupStoreFactory .createWriter (
193230 localFile , bfGenerator .apply (file .rowCount ()));
194231 RecordReader <KeyValue > reader = fileReaderFactory .apply (file )) {
232+ PersistProcessor <T > processor = getOrCreateProcessor (currentSchemaId );
195233 KeyValue kv ;
196- if (persistProcessor .withPosition ()) {
234+ if (processor .withPosition ()) {
197235 FileRecordIterator <KeyValue > batch ;
198236 while ((batch = (FileRecordIterator <KeyValue >) reader .readBatch ()) != null ) {
199237 while ((kv = batch .next ()) != null ) {
200238 byte [] keyBytes = keySerializer .serializeToBytes (kv .key ());
201- byte [] valueBytes =
202- persistProcessor .persistToDisk (kv , batch .returnedPosition ());
239+ byte [] valueBytes = processor .persistToDisk (kv , batch .returnedPosition ());
203240 kvWriter .put (keyBytes , valueBytes );
204241 }
205242 batch .releaseBatch ();
@@ -209,7 +246,7 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws
209246 while ((batch = reader .readBatch ()) != null ) {
210247 while ((kv = batch .next ()) != null ) {
211248 byte [] keyBytes = keySerializer .serializeToBytes (kv .key ());
212- byte [] valueBytes = persistProcessor .persistToDisk (kv );
249+ byte [] valueBytes = processor .persistToDisk (kv );
213250 kvWriter .put (keyBytes , valueBytes );
214251 }
215252 batch .releaseBatch ();
@@ -232,182 +269,4 @@ public void close() throws IOException {
232269 lookupFileCache .invalidate (cachedFile );
233270 }
234271 }
235-
236- /** Processor to process value. */
237- public interface PersistProcessor <T > {
238-
239- String identifier ();
240-
241- boolean withPosition ();
242-
243- byte [] persistToDisk (KeyValue kv );
244-
245- default byte [] persistToDisk (KeyValue kv , long rowPosition ) {
246- throw new UnsupportedOperationException ();
247- }
248-
249- T readFromDisk (InternalRow key , int level , byte [] valueBytes , String fileName );
250- }
251-
252- /** A {@link PersistProcessor} to return {@link KeyValue}. */
253- public static class PersistValueProcessor implements PersistProcessor <KeyValue > {
254-
255- private final RowCompactedSerializer valueSerializer ;
256-
257- public PersistValueProcessor (RowType valueType ) {
258- this .valueSerializer = new RowCompactedSerializer (valueType );
259- }
260-
261- @ Override
262- public String identifier () {
263- return "value" ;
264- }
265-
266- @ Override
267- public boolean withPosition () {
268- return false ;
269- }
270-
271- @ Override
272- public byte [] persistToDisk (KeyValue kv ) {
273- byte [] vBytes = valueSerializer .serializeToBytes (kv .value ());
274- byte [] bytes = new byte [vBytes .length + 8 + 1 ];
275- MemorySegment segment = MemorySegment .wrap (bytes );
276- segment .put (0 , vBytes );
277- segment .putLong (bytes .length - 9 , kv .sequenceNumber ());
278- segment .put (bytes .length - 1 , kv .valueKind ().toByteValue ());
279- return bytes ;
280- }
281-
282- @ Override
283- public KeyValue readFromDisk (InternalRow key , int level , byte [] bytes , String fileName ) {
284- InternalRow value = valueSerializer .deserialize (bytes );
285- long sequenceNumber = MemorySegment .wrap (bytes ).getLong (bytes .length - 9 );
286- RowKind rowKind = RowKind .fromByteValue (bytes [bytes .length - 1 ]);
287- return new KeyValue ().replace (key , sequenceNumber , rowKind , value ).setLevel (level );
288- }
289- }
290-
291- /** A {@link PersistProcessor} to return {@link Boolean} only. */
292- public static class PersistEmptyProcessor implements PersistProcessor <Boolean > {
293-
294- private static final byte [] EMPTY_BYTES = new byte [0 ];
295-
296- @ Override
297- public String identifier () {
298- return "empty" ;
299- }
300-
301- @ Override
302- public boolean withPosition () {
303- return false ;
304- }
305-
306- @ Override
307- public byte [] persistToDisk (KeyValue kv ) {
308- return EMPTY_BYTES ;
309- }
310-
311- @ Override
312- public Boolean readFromDisk (InternalRow key , int level , byte [] bytes , String fileName ) {
313- return Boolean .TRUE ;
314- }
315- }
316-
317- /** A {@link PersistProcessor} to return {@link PositionedKeyValue}. */
318- public static class PersistPositionProcessor implements PersistProcessor <PositionedKeyValue > {
319-
320- private final boolean persistValue ;
321- private final RowCompactedSerializer valueSerializer ;
322-
323- public PersistPositionProcessor (RowType valueType , boolean persistValue ) {
324- this .persistValue = persistValue ;
325- this .valueSerializer = persistValue ? new RowCompactedSerializer (valueType ) : null ;
326- }
327-
328- @ Override
329- public String identifier () {
330- return persistValue ? "position-and-value" : "position" ;
331- }
332-
333- @ Override
334- public boolean withPosition () {
335- return true ;
336- }
337-
338- @ Override
339- public byte [] persistToDisk (KeyValue kv ) {
340- throw new UnsupportedOperationException ();
341- }
342-
343- @ Override
344- public byte [] persistToDisk (KeyValue kv , long rowPosition ) {
345- if (persistValue ) {
346- byte [] vBytes = valueSerializer .serializeToBytes (kv .value ());
347- byte [] bytes = new byte [vBytes .length + 8 + 8 + 1 ];
348- MemorySegment segment = MemorySegment .wrap (bytes );
349- segment .put (0 , vBytes );
350- segment .putLong (bytes .length - 17 , rowPosition );
351- segment .putLong (bytes .length - 9 , kv .sequenceNumber ());
352- segment .put (bytes .length - 1 , kv .valueKind ().toByteValue ());
353- return bytes ;
354- } else {
355- byte [] bytes = new byte [MAX_VAR_LONG_SIZE ];
356- int len = encodeLong (bytes , rowPosition );
357- return Arrays .copyOf (bytes , len );
358- }
359- }
360-
361- @ Override
362- public PositionedKeyValue readFromDisk (
363- InternalRow key , int level , byte [] bytes , String fileName ) {
364- if (persistValue ) {
365- InternalRow value = valueSerializer .deserialize (bytes );
366- MemorySegment segment = MemorySegment .wrap (bytes );
367- long rowPosition = segment .getLong (bytes .length - 17 );
368- long sequenceNumber = segment .getLong (bytes .length - 9 );
369- RowKind rowKind = RowKind .fromByteValue (bytes [bytes .length - 1 ]);
370- return new PositionedKeyValue (
371- new KeyValue ().replace (key , sequenceNumber , rowKind , value ).setLevel (level ),
372- fileName ,
373- rowPosition );
374- } else {
375- long rowPosition = decodeLong (bytes , 0 );
376- return new PositionedKeyValue (null , fileName , rowPosition );
377- }
378- }
379- }
380-
381- /** {@link KeyValue} with file name and row position for DeletionVector. */
382- public static class PositionedKeyValue {
383-
384- private final @ Nullable KeyValue keyValue ;
385- private final String fileName ;
386- private final long rowPosition ;
387-
388- public PositionedKeyValue (@ Nullable KeyValue keyValue , String fileName , long rowPosition ) {
389- this .keyValue = keyValue ;
390- this .fileName = fileName ;
391- this .rowPosition = rowPosition ;
392- }
393-
394- public String fileName () {
395- return fileName ;
396- }
397-
398- public long rowPosition () {
399- return rowPosition ;
400- }
401-
402- @ Nullable
403- public KeyValue keyValue () {
404- return keyValue ;
405- }
406- }
407-
408- /** Downloader to try to download remote lookup file to local. */
409- public interface RemoteFileDownloader {
410-
411- boolean tryToDownload (DataFileMeta dataFile , File localFile );
412- }
413272}
0 commit comments