Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions include/paimon/reader/file_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,55 +46,12 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader {
using BatchReader::NextBatch;
using BatchReader::NextBatchWithBitmap;

/// Seeks to a specific row in the file.
/// @param row_number The row number to seek to.
/// @return The status of the operation.
virtual Status SeekToRow(uint64_t row_number) = 0;

/// Get the row number of the first row in the previously read batch.
virtual uint64_t GetPreviousBatchFirstRowNumber() const = 0;

/// Get the number of rows in the file.
virtual uint64_t GetNumberOfRows() const = 0;

/// Retrieves the row number of the next row to be read.
/// This method indicates the current read position within the file.
/// @return The row number of the next row to read.
virtual uint64_t GetNextRowToRead() const = 0;

/// Generates a list of row ranges to be read in batches.
/// Each range specifies the start and end row numbers for a batch,
/// allowing for efficient batch processing.
///
/// The underlying format layer (e.g., parquet) is responsible for determining
/// the most effective way to split the data. This could be by row groups, stripes,
/// or other internal data structures. The key principle is to split the data
/// into contiguous, seekable ranges to minimize read amplification.
///
/// For example:
/// - A parquet format could split by RowGroup directly, ensuring each range aligns
/// with a single RowGroup.
///
/// The smallest splittable unit must be seekable to its start position, and the
/// splitting strategy should aim to avoid read amplification.
///
/// @param need_prefetch A pointer to a boolean. The format layer sets this to indicate whether
/// prefetching is beneficial for the current scenario, to avoid performance regression in
/// certain cases.
/// @return A vector of pairs, where each pair represents a range with a start and end row
/// number.
virtual Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
bool* need_prefetch) const = 0;

/// Sets the specific row ranges as a hint to be read from format file.
///
/// If the specific file format does not support explicit range-based reads, implementations may
/// gracefully ignore this hint and provide an empty (no-op) implementation.
///
/// @param read_ranges A vector of pairs, where each pair defines a half-open interval
/// `[start_row, end_row)`. The `start_row` is inclusive, and the `end_row` is exclusive.
virtual Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) = 0;

/// Get whether or not support read precisely while bitmap pushed down.
virtual bool SupportPreciseBitmapSelection() const = 0;
};
Expand Down
73 changes: 73 additions & 0 deletions include/paimon/reader/prefetch_file_batch_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2025-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <utility>
#include <vector>

#include "paimon/reader/file_batch_reader.h"

namespace paimon {

class PAIMON_EXPORT PrefetchFileBatchReader : public FileBatchReader {
public:
/// Seeks to a specific row in the file.
/// @param row_number The row number to seek to.
/// @return The status of the operation.
virtual Status SeekToRow(uint64_t row_number) = 0;

/// Retrieves the row number of the next row to be read.
/// This method indicates the current read position within the file.
/// @return The row number of the next row to read.
virtual uint64_t GetNextRowToRead() const = 0;

/// Generates a list of row ranges to be read in batches.
/// Each range specifies the start and end row numbers for a batch,
/// allowing for efficient batch processing.
///
/// The underlying format layer (e.g., parquet) is responsible for determining
/// the most effective way to split the data. This could be by row groups, stripes,
/// or other internal data structures. The key principle is to split the data
/// into contiguous, seekable ranges to minimize read amplification.
///
/// For example:
/// - A parquet format could split by RowGroup directly, ensuring each range aligns
/// with a single RowGroup.
///
/// The smallest splittable unit must be seekable to its start position, and the
/// splitting strategy should aim to avoid read amplification.
///
/// @param need_prefetch A pointer to a boolean. The format layer sets this to indicate whether
/// prefetching is beneficial for the current scenario, to avoid performance regression in
/// certain cases.
/// @return A vector of pairs, where each pair represents a range with a start and end row
/// number.
virtual Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
bool* need_prefetch) const = 0;

/// Sets the specific row ranges as a hint to be read from format file.
///
/// If the specific file format does not support explicit range-based reads, implementations may
/// gracefully ignore this hint and provide an empty (no-op) implementation.
///
/// @param read_ranges A vector of pairs, where each pair defines a half-open interval
/// `[start_row, end_row)`. The `start_row` is inclusive, and the `end_row` is exclusive.
virtual Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) = 0;
};

} // namespace paimon
4 changes: 2 additions & 2 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ set(PAIMON_COMMON_SRCS
common/reader/batch_reader.cpp
common/reader/concat_batch_reader.cpp
common/reader/predicate_batch_reader.cpp
common/reader/prefetch_file_batch_reader.cpp
common/reader/prefetch_file_batch_reader_impl.cpp
common/reader/reader_utils.cpp
common/reader/complete_row_kind_batch_reader.cpp
common/reader/data_evolution_file_reader.cpp
Expand Down Expand Up @@ -351,7 +351,7 @@ if(PAIMON_BUILD_TESTS)
common/predicate/predicate_validator_test.cpp
common/reader/concat_batch_reader_test.cpp
common/reader/predicate_batch_reader_test.cpp
common/reader/prefetch_file_batch_reader_test.cpp
common/reader/prefetch_file_batch_reader_impl_test.cpp
common/reader/reader_utils_test.cpp
common/reader/complete_row_kind_batch_reader_test.cpp
common/reader/data_evolution_file_reader_test.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "fmt/format.h"
#include "fmt/ranges.h"
#include "gtest/gtest.h"
#include "paimon/common/reader/prefetch_file_batch_reader.h"
#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/executor.h"
#include "paimon/memory/memory_pool.h"
Expand Down Expand Up @@ -88,7 +88,7 @@ class ApplyBitmapIndexBatchReaderTest : public ::testing::Test,
if (enable_prefetch) {
MockFormatReaderBuilder reader_builder(data, target_type_, batch_size);
ASSERT_OK_AND_ASSIGN(file_batch_reader,
PrefetchFileBatchReader::Create(
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"DUMMY", &reader_builder, fs_,
prefetch_batch_count, batch_size, prefetch_batch_count * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
Expand Down
26 changes: 6 additions & 20 deletions src/paimon/common/reader/delegating_prefetch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

#include "arrow/c/bridge.h"
#include "arrow/type.h"
#include "paimon/common/reader/prefetch_file_batch_reader.h"
#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
#include "paimon/reader/file_batch_reader.h"

namespace paimon {

class DelegatingPrefetchReader : public FileBatchReader {
public:
explicit DelegatingPrefetchReader(std::unique_ptr<PrefetchFileBatchReader> prefetch_reader)
explicit DelegatingPrefetchReader(std::unique_ptr<PrefetchFileBatchReaderImpl> prefetch_reader)
: prefetch_reader_(std::move(prefetch_reader)) {}

Result<ReadBatch> NextBatch() override {
Expand All @@ -48,38 +48,24 @@ class DelegatingPrefetchReader : public FileBatchReader {
Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override {
return GetReader()->GetFileSchema();
}

Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
const std::optional<RoaringBitmap32>& selection_bitmap) override {
return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap);
}

Status SeekToRow(uint64_t row_number) override {
assert(false);
return Status::NotImplemented("not support seek to row for delegate reader");
}
uint64_t GetPreviousBatchFirstRowNumber() const override {
return GetReader()->GetPreviousBatchFirstRowNumber();
}

uint64_t GetNumberOfRows() const override {
return GetReader()->GetNumberOfRows();
}
uint64_t GetNextRowToRead() const override {
return GetReader()->GetNextRowToRead();
}

Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
bool* need_prefetch) const override {
assert(false);
return Status::NotImplemented("gen read ranges not implemented");
}

void Close() override {
return prefetch_reader_->Close();
}
Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) override {
assert(false);
return Status::NotImplemented("not support set read ranges for delegate reader");
}

bool SupportPreciseBitmapSelection() const override {
return GetReader()->SupportPreciseBitmapSelection();
}
Expand All @@ -94,7 +80,7 @@ class DelegatingPrefetchReader : public FileBatchReader {
}
}

std::unique_ptr<PrefetchFileBatchReader> prefetch_reader_;
std::unique_ptr<PrefetchFileBatchReaderImpl> prefetch_reader_;
};

} // namespace paimon
Loading
Loading