Skip to content

[Log] Improve internals of LogScanner by lazily deserialising from record stream / arrow buffer #2041

@leekeiabstraction

Description

@leekeiabstraction

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Optimise/decrease memory footprint when LogScannerImpl.poll(Duration) is used by addressing TODO here: https://github.com/apache/fluss/blob/main/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java#L96-L97

Solution

Proposed changes as follow:

Change the signatures and implementation that currently returns List<ScanRecord> to return CloseableIterator<ScanRecord>. This includes (not exhaustive):

  1. CompletedFetch's List<ScanRecord> fetchRecords(int) to CloseableIterator<ScanRecord> fetchRecords(). Current int argument is to indicate max number of records to fetch and deserialise, I propose removing it and let the eventual user of the iterator decide how many to fetch. An alternative here is to make CompletedFetch implements Iterable<ScanRecord>.
  2. LogFetchCollector's Map<TableBucket, List<ScanRecord>> collectFetch(LogFetchBuffer) to Map<TableBucket, CloseableIterator<ScanRecord>> collectFetch(LogFetchBuffer)
  3. LogFetcher's Map<TableBucket, List<ScanRecord>> collectFetch(LogFetchBuffer) to Map<TableBucket, CloseableIterator <ScanRecord>> collectFetch(LogFetchBuffer)

The closing of resources is done in two places:

  1. Within ScanRecords.ConcatenatedIterable where CloseableIterators are closed when they have no next element.
  2. Within LogScannerImpl.close() where LogScannerImpl will track unclosed CloseableIterators and close them.

Anything else?

Feedback and suggestion on alternative approaches welcome!

Willingness to contribute

  • I'm willing to submit a PR!

Metadata

Metadata

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions