3434import org .apache .paimon .utils .BloomFilter ;
3535import org .apache .paimon .utils .FileIOUtils ;
3636import org .apache .paimon .utils .IOFunction ;
37+ import org .apache .paimon .utils .Pair ;
3738
3839import org .apache .paimon .shade .caffeine2 .com .github .benmanes .caffeine .cache .Cache ;
3940
4546import java .util .Comparator ;
4647import java .util .HashSet ;
4748import java .util .Map ;
49+ import java .util .Optional ;
4850import java .util .Set ;
4951import java .util .TreeSet ;
5052import java .util .concurrent .ConcurrentHashMap ;
@@ -68,8 +70,7 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
6870 private final Function <Long , BloomFilter .Builder > bfGenerator ;
6971 private final Cache <String , LookupFile > lookupFileCache ;
7072 private final Set <String > ownCachedFiles ;
71- private final String remoteSstSuffix ;
72- private final Map <Long , PersistProcessor <T >> schemaIdToProcessors ;
73+ private final Map <Pair <Long , String >, PersistProcessor <T >> schemaIdAndSerVersionToProcessors ;
7374
7475 @ Nullable private RemoteFileDownloader remoteFileDownloader ;
7576
@@ -99,13 +100,7 @@ public LookupLevels(
99100 this .bfGenerator = bfGenerator ;
100101 this .lookupFileCache = lookupFileCache ;
101102 this .ownCachedFiles = new HashSet <>();
102- this .remoteSstSuffix =
103- "."
104- + processorFactory .identifier ()
105- + "."
106- + serializerFactory .identifier ()
107- + REMOTE_LOOKUP_FILE_SUFFIX ;
108- this .schemaIdToProcessors = new ConcurrentHashMap <>();
103+ this .schemaIdAndSerVersionToProcessors = new ConcurrentHashMap <>();
109104 levels .addDropFileCallback (this );
110105 }
111106
@@ -170,17 +165,17 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
170165 return null ;
171166 }
172167
173- return getOrCreateProcessor (lookupFile .schemaId ())
168+ return getOrCreateProcessor (lookupFile .schemaId (), lookupFile . serVersion () )
174169 .readFromDisk (key , lookupFile .level (), valueBytes , file .fileName ());
175170 }
176171
177- private PersistProcessor <T > getOrCreateProcessor (long schemaId ) {
178- return schemaIdToProcessors .computeIfAbsent (
179- schemaId ,
172+ private PersistProcessor <T > getOrCreateProcessor (long schemaId , String serVersion ) {
173+ return schemaIdAndSerVersionToProcessors .computeIfAbsent (
174+ Pair . of ( schemaId , serVersion ) ,
180175 id -> {
181176 RowType fileSchema =
182177 schemaId == currentSchemaId ? null : schemaFunction .apply (schemaId );
183- return processorFactory .create (serializerFactory , fileSchema );
178+ return processorFactory .create (serVersion , serializerFactory , fileSchema );
184179 });
185180 }
186181
@@ -191,9 +186,12 @@ public LookupFile createLookupFile(DataFileMeta file) throws IOException {
191186 }
192187
193188 long schemaId = this .currentSchemaId ;
194- if (tryToDownloadRemoteSst (file , localFile )) {
189+ String fileSerVersion = serializerFactory .version ();
190+ Optional <String > downloadSerVersion = tryToDownloadRemoteSst (file , localFile );
191+ if (downloadSerVersion .isPresent ()) {
195192 // use schema id from remote file
196193 schemaId = file .schemaId ();
194+ fileSerVersion = downloadSerVersion .get ();
197195 } else {
198196 createSstFileFromDataFile (file , localFile );
199197 }
@@ -203,21 +201,35 @@ public LookupFile createLookupFile(DataFileMeta file) throws IOException {
203201 localFile ,
204202 file .level (),
205203 schemaId ,
204+ fileSerVersion ,
206205 lookupStoreFactory .createReader (localFile ),
207206 () -> ownCachedFiles .remove (file .fileName ()));
208207 }
209208
210- private boolean tryToDownloadRemoteSst (DataFileMeta file , File localFile ) {
209+ private Optional < String > tryToDownloadRemoteSst (DataFileMeta file , File localFile ) {
211210 if (remoteFileDownloader == null ) {
212- return false ;
211+ return Optional .empty ();
212+ }
213+ Optional <RemoteSstFile > remoteSstFile = remoteSst (file );
214+ if (!remoteSstFile .isPresent ()) {
215+ return Optional .empty ();
213216 }
217+
218+ RemoteSstFile remoteSst = remoteSstFile .get ();
219+
214220 // validate schema matched, no exception here
215221 try {
216- getOrCreateProcessor (file .schemaId ());
222+ getOrCreateProcessor (file .schemaId (), remoteSst . serVersion );
217223 } catch (UnsupportedOperationException e ) {
218- return false ;
224+ return Optional .empty ();
225+ }
226+ boolean success =
227+ remoteFileDownloader .tryToDownload (file , remoteSst .sstFileName , localFile );
228+ if (!success ) {
229+ return Optional .empty ();
219230 }
220- return remoteFileDownloader .tryToDownload (file , localFile );
231+
232+ return Optional .of (remoteSst .serVersion );
221233 }
222234
223235 public void addLocalFile (DataFileMeta file , LookupFile lookupFile ) {
@@ -229,7 +241,8 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws
229241 lookupStoreFactory .createWriter (
230242 localFile , bfGenerator .apply (file .rowCount ()));
231243 RecordReader <KeyValue > reader = fileReaderFactory .apply (file )) {
232- PersistProcessor <T > processor = getOrCreateProcessor (currentSchemaId );
244+ PersistProcessor <T > processor =
245+ getOrCreateProcessor (currentSchemaId , serializerFactory .version ());
233246 KeyValue kv ;
234247 if (processor .withPosition ()) {
235248 FileRecordIterator <KeyValue > batch ;
@@ -258,8 +271,39 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws
258271 }
259272 }
260273
261- public String remoteSstSuffix () {
262- return remoteSstSuffix ;
274+ public Optional <RemoteSstFile > remoteSst (DataFileMeta file ) {
275+ Optional <String > sstFile =
276+ file .extraFiles ().stream ()
277+ .filter (f -> f .endsWith (REMOTE_LOOKUP_FILE_SUFFIX ))
278+ .findFirst ();
279+ if (!sstFile .isPresent ()) {
280+ return Optional .empty ();
281+ }
282+
283+ String sstFileName = sstFile .get ();
284+ String [] split = sstFileName .split ("\\ ." );
285+ if (split .length < 3 ) {
286+ return Optional .empty ();
287+ }
288+
289+ String processorId = split [split .length - 3 ];
290+ if (!processorFactory .identifier ().equals (processorId )) {
291+ return Optional .empty ();
292+ }
293+
294+ String serVersion = split [split .length - 2 ];
295+ return Optional .of (new RemoteSstFile (sstFileName , serVersion ));
296+ }
297+
298+ public String newRemoteSst (DataFileMeta file , long length ) {
299+ return file .fileName ()
300+ + "."
301+ + length
302+ + "."
303+ + processorFactory .identifier ()
304+ + "."
305+ + serializerFactory .version ()
306+ + REMOTE_LOOKUP_FILE_SUFFIX ;
263307 }
264308
265309 @ Override
@@ -269,4 +313,16 @@ public void close() throws IOException {
269313 lookupFileCache .invalidate (cachedFile );
270314 }
271315 }
316+
317+ /** Remote sst file with serVersion. */
318+ public static class RemoteSstFile {
319+
320+ private final String sstFileName ;
321+ private final String serVersion ;
322+
323+ private RemoteSstFile (String sstFileName , String serVersion ) {
324+ this .sstFileName = sstFileName ;
325+ this .serVersion = serVersion ;
326+ }
327+ }
272328}
0 commit comments