diff --git a/include/paimon/reader/file_batch_reader.h b/include/paimon/reader/file_batch_reader.h index 88e0335d..21c780a1 100644 --- a/include/paimon/reader/file_batch_reader.h +++ b/include/paimon/reader/file_batch_reader.h @@ -46,55 +46,12 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader { using BatchReader::NextBatch; using BatchReader::NextBatchWithBitmap; - /// Seeks to a specific row in the file. - /// @param row_number The row number to seek to. - /// @return The status of the operation. - virtual Status SeekToRow(uint64_t row_number) = 0; - /// Get the row number of the first row in the previously read batch. virtual uint64_t GetPreviousBatchFirstRowNumber() const = 0; /// Get the number of rows in the file. virtual uint64_t GetNumberOfRows() const = 0; - /// Retrieves the row number of the next row to be read. - /// This method indicates the current read position within the file. - /// @return The row number of the next row to read. - virtual uint64_t GetNextRowToRead() const = 0; - - /// Generates a list of row ranges to be read in batches. - /// Each range specifies the start and end row numbers for a batch, - /// allowing for efficient batch processing. - /// - /// The underlying format layer (e.g., parquet) is responsible for determining - /// the most effective way to split the data. This could be by row groups, stripes, - /// or other internal data structures. The key principle is to split the data - /// into contiguous, seekable ranges to minimize read amplification. - /// - /// For example: - /// - A parquet format could split by RowGroup directly, ensuring each range aligns - /// with a single RowGroup. - /// - /// The smallest splittable unit must be seekable to its start position, and the - /// splitting strategy should aim to avoid read amplification. - /// - /// @param need_prefetch A pointer to a boolean. The format layer sets this to indicate whether - /// prefetching is beneficial for the current scenario, to avoid performance regression in - /// certain cases. - /// @return A vector of pairs, where each pair represents a range with a start and end row - /// number. - virtual Result>> GenReadRanges( - bool* need_prefetch) const = 0; - - /// Sets the specific row ranges as a hint to be read from format file. - /// - /// If the specific file format does not support explicit range-based reads, implementations may - /// gracefully ignore this hint and provide an empty (no-op) implementation. - /// - /// @param read_ranges A vector of pairs, where each pair defines a half-open interval - /// `[start_row, end_row)`. The `start_row` is inclusive, and the `end_row` is exclusive. - virtual Status SetReadRanges(const std::vector>& read_ranges) = 0; - /// Get whether or not support read precisely while bitmap pushed down. virtual bool SupportPreciseBitmapSelection() const = 0; }; diff --git a/include/paimon/reader/prefetch_file_batch_reader.h b/include/paimon/reader/prefetch_file_batch_reader.h new file mode 100644 index 00000000..e2137fe5 --- /dev/null +++ b/include/paimon/reader/prefetch_file_batch_reader.h @@ -0,0 +1,73 @@ +/* + * Copyright 2025-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/reader/file_batch_reader.h" + +namespace paimon { + +class PAIMON_EXPORT PrefetchFileBatchReader : public FileBatchReader { + public: + /// Seeks to a specific row in the file. + /// @param row_number The row number to seek to. + /// @return The status of the operation. + virtual Status SeekToRow(uint64_t row_number) = 0; + + /// Retrieves the row number of the next row to be read. + /// This method indicates the current read position within the file. + /// @return The row number of the next row to read. + virtual uint64_t GetNextRowToRead() const = 0; + + /// Generates a list of row ranges to be read in batches. + /// Each range specifies the start and end row numbers for a batch, + /// allowing for efficient batch processing. + /// + /// The underlying format layer (e.g., parquet) is responsible for determining + /// the most effective way to split the data. This could be by row groups, stripes, + /// or other internal data structures. The key principle is to split the data + /// into contiguous, seekable ranges to minimize read amplification. + /// + /// For example: + /// - A parquet format could split by RowGroup directly, ensuring each range aligns + /// with a single RowGroup. + /// + /// The smallest splittable unit must be seekable to its start position, and the + /// splitting strategy should aim to avoid read amplification. + /// + /// @param need_prefetch A pointer to a boolean. The format layer sets this to indicate whether + /// prefetching is beneficial for the current scenario, to avoid performance regression in + /// certain cases. + /// @return A vector of pairs, where each pair represents a range with a start and end row + /// number. + virtual Result>> GenReadRanges( + bool* need_prefetch) const = 0; + + /// Sets the specific row ranges as a hint to be read from format file. + /// + /// If the specific file format does not support explicit range-based reads, implementations may + /// gracefully ignore this hint and provide an empty (no-op) implementation. + /// + /// @param read_ranges A vector of pairs, where each pair defines a half-open interval + /// `[start_row, end_row)`. The `start_row` is inclusive, and the `end_row` is exclusive. + virtual Status SetReadRanges(const std::vector>& read_ranges) = 0; +}; + +} // namespace paimon diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 82d516c0..4a11f181 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -86,7 +86,7 @@ set(PAIMON_COMMON_SRCS common/reader/batch_reader.cpp common/reader/concat_batch_reader.cpp common/reader/predicate_batch_reader.cpp - common/reader/prefetch_file_batch_reader.cpp + common/reader/prefetch_file_batch_reader_impl.cpp common/reader/reader_utils.cpp common/reader/complete_row_kind_batch_reader.cpp common/reader/data_evolution_file_reader.cpp @@ -351,7 +351,7 @@ if(PAIMON_BUILD_TESTS) common/predicate/predicate_validator_test.cpp common/reader/concat_batch_reader_test.cpp common/reader/predicate_batch_reader_test.cpp - common/reader/prefetch_file_batch_reader_test.cpp + common/reader/prefetch_file_batch_reader_impl_test.cpp common/reader/reader_utils_test.cpp common/reader/complete_row_kind_batch_reader_test.cpp common/reader/data_evolution_file_reader_test.cpp diff --git a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp index 6b621671..0ca7ed56 100644 --- a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp +++ b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp @@ -26,7 +26,7 @@ #include "fmt/format.h" #include "fmt/ranges.h" #include "gtest/gtest.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/executor.h" #include "paimon/memory/memory_pool.h" @@ -88,7 +88,7 @@ class ApplyBitmapIndexBatchReaderTest : public ::testing::Test, if (enable_prefetch) { MockFormatReaderBuilder reader_builder(data, target_type_, batch_size); ASSERT_OK_AND_ASSIGN(file_batch_reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"DUMMY", &reader_builder, fs_, prefetch_batch_count, batch_size, prefetch_batch_count * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, diff --git a/src/paimon/common/reader/delegating_prefetch_reader.h b/src/paimon/common/reader/delegating_prefetch_reader.h index 78b61d4b..99dd5c62 100644 --- a/src/paimon/common/reader/delegating_prefetch_reader.h +++ b/src/paimon/common/reader/delegating_prefetch_reader.h @@ -22,14 +22,14 @@ #include "arrow/c/bridge.h" #include "arrow/type.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/reader/file_batch_reader.h" namespace paimon { class DelegatingPrefetchReader : public FileBatchReader { public: - explicit DelegatingPrefetchReader(std::unique_ptr prefetch_reader) + explicit DelegatingPrefetchReader(std::unique_ptr prefetch_reader) : prefetch_reader_(std::move(prefetch_reader)) {} Result NextBatch() override { @@ -48,38 +48,24 @@ class DelegatingPrefetchReader : public FileBatchReader { Result> GetFileSchema() const override { return GetReader()->GetFileSchema(); } + Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override { return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap); } - Status SeekToRow(uint64_t row_number) override { - assert(false); - return Status::NotImplemented("not support seek to row for delegate reader"); - } uint64_t GetPreviousBatchFirstRowNumber() const override { return GetReader()->GetPreviousBatchFirstRowNumber(); } + uint64_t GetNumberOfRows() const override { return GetReader()->GetNumberOfRows(); } - uint64_t GetNextRowToRead() const override { - return GetReader()->GetNextRowToRead(); - } - - Result>> GenReadRanges( - bool* need_prefetch) const override { - assert(false); - return Status::NotImplemented("gen read ranges not implemented"); - } void Close() override { return prefetch_reader_->Close(); } - Status SetReadRanges(const std::vector>& read_ranges) override { - assert(false); - return Status::NotImplemented("not support set read ranges for delegate reader"); - } + bool SupportPreciseBitmapSelection() const override { return GetReader()->SupportPreciseBitmapSelection(); } @@ -94,7 +80,7 @@ class DelegatingPrefetchReader : public FileBatchReader { } } - std::unique_ptr prefetch_reader_; + std::unique_ptr prefetch_reader_; }; } // namespace paimon diff --git a/src/paimon/common/reader/prefetch_file_batch_reader.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp similarity index 87% rename from src/paimon/common/reader/prefetch_file_batch_reader.cpp rename to src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp index 0dcb3228..2d2d337e 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include #include @@ -38,7 +38,7 @@ class Schema; namespace paimon { -Result> PrefetchFileBatchReader::Create( +Result> PrefetchFileBatchReaderImpl::Create( const std::string& data_file_path, const ReaderBuilder* reader_builder, const std::shared_ptr& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size, uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy, @@ -63,7 +63,6 @@ Result> PrefetchFileBatchReader::Create } std::vector>>> futures; - std::vector> readers; for (uint32_t i = 0; i < prefetch_max_parallel_num; i++) { futures.push_back(Via( executor.get(), @@ -72,20 +71,27 @@ Result> PrefetchFileBatchReader::Create return reader_builder->Build(std::move(input_stream)); })); } + std::vector> readers; for (auto& file_batch_reader : CollectAll(futures)) { if (!file_batch_reader.ok()) { return file_batch_reader.status(); } - readers.emplace_back(std::move(file_batch_reader).value()); + std::shared_ptr reader = std::move(file_batch_reader).value(); + auto prefetch_file_batch_reader = + std::dynamic_pointer_cast(reader); + if (prefetch_file_batch_reader == nullptr) { + return Status::Invalid( + "failed to cast to prefetch file batch reader. file format not support prefetch"); + } + readers.emplace_back(prefetch_file_batch_reader); } if (prefetch_batch_count < readers.size()) { prefetch_batch_count = readers.size(); } uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size(); - auto reader = std::unique_ptr( - new PrefetchFileBatchReader(std::move(readers), batch_size, prefetch_queue_capacity, - enable_adaptive_prefetch_strategy, executor)); + auto reader = std::unique_ptr(new PrefetchFileBatchReaderImpl( + readers, batch_size, prefetch_queue_capacity, enable_adaptive_prefetch_strategy, executor)); if (initialize_read_ranges) { // normally initialize read ranges should be false, as set read schema will refresh read // ranges, and set read schema will always be called before read. @@ -94,8 +100,8 @@ Result> PrefetchFileBatchReader::Create return reader; } -PrefetchFileBatchReader::PrefetchFileBatchReader( - std::vector>&& readers, int32_t batch_size, +PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl( + const std::vector>& readers, int32_t batch_size, uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy, const std::shared_ptr& executor) : readers_(std::move(readers)), @@ -111,11 +117,11 @@ PrefetchFileBatchReader::PrefetchFileBatchReader( parallel_num_ = readers_.size(); } -PrefetchFileBatchReader::~PrefetchFileBatchReader() { +PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() { (void)CleanUp(); } -Status PrefetchFileBatchReader::SetReadSchema( +Status PrefetchFileBatchReaderImpl::SetReadSchema( ::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) { PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr schema, @@ -129,7 +135,7 @@ Status PrefetchFileBatchReader::SetReadSchema( return RefreshReadRanges(); } -Status PrefetchFileBatchReader::RefreshReadRanges() { +Status PrefetchFileBatchReaderImpl::RefreshReadRanges() { PAIMON_RETURN_NOT_OK(CleanUp()); bool need_prefetch; PAIMON_ASSIGN_OR_RAISE(auto read_ranges, readers_[0]->GenReadRanges(&need_prefetch)); @@ -150,7 +156,7 @@ Status PrefetchFileBatchReader::RefreshReadRanges() { return Status::OK(); } -std::vector> PrefetchFileBatchReader::FilterReadRanges( +std::vector> PrefetchFileBatchReaderImpl::FilterReadRanges( const std::vector>& read_ranges, const std::optional& selection_bitmap) { if (!selection_bitmap) { @@ -165,7 +171,7 @@ std::vector> PrefetchFileBatchReader::FilterReadRa return result; } -Status PrefetchFileBatchReader::SetReadRanges( +Status PrefetchFileBatchReaderImpl::SetReadRanges( const std::vector>& read_ranges) { // push down read ranges for reducing IO amplification read_ranges_in_group_ = DispatchReadRanges(read_ranges, readers_.size()); @@ -194,7 +200,8 @@ Status PrefetchFileBatchReader::SetReadRanges( return Status::OK(); } -std::vector>> PrefetchFileBatchReader::DispatchReadRanges( +std::vector>> +PrefetchFileBatchReaderImpl::DispatchReadRanges( const std::vector>& read_ranges, size_t group_count) { std::vector>> read_ranges_in_group; read_ranges_in_group.resize(group_count); @@ -204,7 +211,7 @@ std::vector>> PrefetchFileBatchReader: return read_ranges_in_group; } -Status PrefetchFileBatchReader::CleanUp() { +Status PrefetchFileBatchReaderImpl::CleanUp() { auto clean_prefetch_queue = [this]() { for (auto& prefetch_queue : prefetch_queues_) { while (true) { @@ -248,7 +255,7 @@ Status PrefetchFileBatchReader::CleanUp() { return Status::OK(); } -void PrefetchFileBatchReader::Workloop() { +void PrefetchFileBatchReaderImpl::Workloop() { std::vector> futures; futures.resize(readers_.size()); while (true) { @@ -312,14 +319,14 @@ void PrefetchFileBatchReader::Workloop() { Wait(futures); } -void PrefetchFileBatchReader::ReadBatch(size_t reader_idx) { +void PrefetchFileBatchReaderImpl::ReadBatch(size_t reader_idx) { Status status = DoReadBatch(reader_idx); if (!status.ok()) { SetReadStatus(status); } } -std::optional> PrefetchFileBatchReader::GetCurrentReadRange( +std::optional> PrefetchFileBatchReaderImpl::GetCurrentReadRange( size_t reader_idx) const { const auto& read_ranges = read_ranges_in_group_[reader_idx]; const auto& current_pos = readers_pos_[reader_idx]; @@ -333,7 +340,7 @@ std::optional> PrefetchFileBatchReader::GetCurrent return std::nullopt; } -Status PrefetchFileBatchReader::EnsureReaderPosition( +Status PrefetchFileBatchReaderImpl::EnsureReaderPosition( size_t reader_idx, const std::pair& current_read_range) const { uint64_t pos = std::max(readers_pos_[reader_idx]->load(), current_read_range.first); if (readers_[reader_idx]->GetNextRowToRead() != pos) { @@ -342,9 +349,9 @@ Status PrefetchFileBatchReader::EnsureReaderPosition( return Status::OK(); } -Status PrefetchFileBatchReader::HandleReadResult(size_t reader_idx, - const std::pair& read_range, - ReadBatchWithBitmap&& read_batch_with_bitmap) { +Status PrefetchFileBatchReaderImpl::HandleReadResult( + size_t reader_idx, const std::pair& read_range, + ReadBatchWithBitmap&& read_batch_with_bitmap) { uint64_t first_row_number = readers_[reader_idx]->GetPreviousBatchFirstRowNumber(); auto& prefetch_queue = prefetch_queues_[reader_idx]; if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) { @@ -383,7 +390,7 @@ Status PrefetchFileBatchReader::HandleReadResult(size_t reader_idx, return Status::OK(); } -Status PrefetchFileBatchReader::DoReadBatch(size_t reader_idx) { +Status PrefetchFileBatchReaderImpl::DoReadBatch(size_t reader_idx) { PAIMON_RETURN_NOT_OK(GetReadStatus()); if (is_shutdown_) { return Status::OK(); @@ -414,13 +421,13 @@ Status PrefetchFileBatchReader::DoReadBatch(size_t reader_idx) { return HandleReadResult(reader_idx, read_range, std::move(read_batch_with_bitmap)); } -Result PrefetchFileBatchReader::NextBatchWithBitmap() { +Result PrefetchFileBatchReaderImpl::NextBatchWithBitmap() { if (!read_ranges_freshed_) { return Status::Invalid("prefetch reader read ranges are not initialized"); } if (!background_thread_) { background_thread_ = - std::make_unique(&PrefetchFileBatchReader::Workloop, this); + std::make_unique(&PrefetchFileBatchReaderImpl::Workloop, this); } while (true) { PAIMON_RETURN_NOT_OK(GetReadStatus()); @@ -489,51 +496,52 @@ Result PrefetchFileBatchReader::NextBatchWithB } } -Status PrefetchFileBatchReader::SeekToRow(uint64_t row_number) { +Status PrefetchFileBatchReaderImpl::SeekToRow(uint64_t row_number) { return Status::NotImplemented("not support seek to row for prefetch reader"); } -std::shared_ptr PrefetchFileBatchReader::GetReaderMetrics() const { +std::shared_ptr PrefetchFileBatchReaderImpl::GetReaderMetrics() const { return MetricsImpl::CollectReadMetrics(readers_); } -Result> PrefetchFileBatchReader::GetFileSchema() const { +Result> PrefetchFileBatchReaderImpl::GetFileSchema() const { assert(!readers_.empty()); return readers_[0]->GetFileSchema(); } -uint64_t PrefetchFileBatchReader::GetPreviousBatchFirstRowNumber() const { +uint64_t PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const { return previous_batch_first_row_num_; } -uint64_t PrefetchFileBatchReader::GetNumberOfRows() const { +uint64_t PrefetchFileBatchReaderImpl::GetNumberOfRows() const { assert(!readers_.empty()); return readers_[0]->GetNumberOfRows(); } -uint64_t PrefetchFileBatchReader::GetNextRowToRead() const { +uint64_t PrefetchFileBatchReaderImpl::GetNextRowToRead() const { assert(false); return -1; } -void PrefetchFileBatchReader::SetReadStatus(const Status& status) { +void PrefetchFileBatchReaderImpl::SetReadStatus(const Status& status) { std::unique_lock lock(rw_mutex_); read_status_ = status; } -Status PrefetchFileBatchReader::GetReadStatus() const { +Status PrefetchFileBatchReaderImpl::GetReadStatus() const { std::shared_lock lock(rw_mutex_); return read_status_; } -bool PrefetchFileBatchReader::IsEofRange(const std::pair& read_range) const { +bool PrefetchFileBatchReaderImpl::IsEofRange( + const std::pair& read_range) const { return read_range.first >= GetNumberOfRows(); } -std::pair PrefetchFileBatchReader::EofRange() const { +std::pair PrefetchFileBatchReaderImpl::EofRange() const { return {GetNumberOfRows(), GetNumberOfRows() + 1}; } -void PrefetchFileBatchReader::Close() { +void PrefetchFileBatchReaderImpl::Close() { (void)CleanUp(); for (const auto& reader : readers_) { reader->Close(); diff --git a/src/paimon/common/reader/prefetch_file_batch_reader.h b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h similarity index 86% rename from src/paimon/common/reader/prefetch_file_batch_reader.h rename to src/paimon/common/reader/prefetch_file_batch_reader_impl.h index c4132d8e..1ede5cab 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader.h +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h @@ -35,7 +35,7 @@ #include "arrow/c/abi.h" #include "paimon/common/utils/threadsafe_queue.h" #include "paimon/reader/batch_reader.h" -#include "paimon/reader/file_batch_reader.h" +#include "paimon/reader/prefetch_file_batch_reader.h" #include "paimon/result.h" #include "paimon/status.h" #include "paimon/utils/roaring_bitmap32.h" @@ -50,21 +50,21 @@ class Executor; class Predicate; class Metrics; -class PrefetchFileBatchReader : public FileBatchReader { +class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { public: - static Result> Create( + static Result> Create( const std::string& data_file_path, const ReaderBuilder* reader_builder, const std::shared_ptr& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size, uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy, const std::shared_ptr& executor, bool initialize_read_ranges); - ~PrefetchFileBatchReader() override; + ~PrefetchFileBatchReaderImpl() override; - Result NextBatch() override { + Result NextBatch() override { return Status::Invalid( "paimon inner reader PrefetchFileBatchReader should use NextBatchWithBitmap"); } - Result NextBatchWithBitmap() override; + Result NextBatchWithBitmap() override; std::shared_ptr GetReaderMetrics() const override; @@ -90,7 +90,7 @@ class PrefetchFileBatchReader : public FileBatchReader { Status RefreshReadRanges(); - inline FileBatchReader* GetFirstReader() const { + inline PrefetchFileBatchReader* GetFirstReader() const { return readers_[0].get(); } @@ -105,10 +105,10 @@ class PrefetchFileBatchReader : public FileBatchReader { uint64_t previous_batch_first_row_num; }; - PrefetchFileBatchReader(std::vector>&& readers, - int32_t batch_size, uint32_t prefetch_queue_capacity, - bool enable_adaptive_prefetch_strategy, - const std::shared_ptr& executor); + PrefetchFileBatchReaderImpl( + const std::vector>& readers, int32_t batch_size, + uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy, + const std::shared_ptr& executor); Status CleanUp(); void Workloop(); @@ -130,10 +130,10 @@ class PrefetchFileBatchReader : public FileBatchReader { Status EnsureReaderPosition(size_t reader_idx, const std::pair& read_range) const; Status HandleReadResult(size_t reader_idx, const std::pair& read_range, - ReadBatchWithBitmap&& read_batch_with_bitmap); + FileBatchReader::ReadBatchWithBitmap&& read_batch_with_bitmap); private: - std::vector> readers_; + std::vector> readers_; // The meaning of readers_pos_ is: all data before this pos has been filtered out or effectively // consumed, and the data after this pos may need to be read in the next round of reading. std::vector>> readers_pos_; diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_test.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp similarity index 87% rename from src/paimon/common/reader/prefetch_file_batch_reader_test.cpp rename to src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp index a05f5046..1f100835 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_test.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include @@ -37,8 +37,8 @@ namespace paimon::test { -class PrefetchFileBatchReaderTest : public ::testing::Test, - public ::testing::WithParamInterface { +class PrefetchFileBatchReaderImplTest : public ::testing::Test, + public ::testing::WithParamInterface { public: void SetUp() override { fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int64()), @@ -81,8 +81,10 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, ASSERT_TRUE(arrow::ExportSchema(schema, &c_schema).ok()); ASSERT_OK_AND_ASSIGN( std::unique_ptr file_format, - FileFormatFactory::Get(file_format_str, {{"parquet.write.max-row-group-length", - std::to_string(row_index_stride)}})); + FileFormatFactory::Get( + file_format_str, + {{"parquet.write.max-row-group-length", std::to_string(row_index_stride)}, + {"orc.row.index.stride", std::to_string(row_index_stride)}})); ASSERT_OK_AND_ASSIGN(auto writer_builder, file_format->CreateWriterBuilder(&c_schema, 1024)); @@ -107,7 +109,7 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, ASSERT_OK(out->Close()); } - std::unique_ptr PreparePrefetchReader( + std::unique_ptr PreparePrefetchReader( const std::string& file_format_str, const arrow::Schema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap, int32_t batch_size, @@ -116,8 +118,8 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, FileFormatFactory::Get(file_format_str, {})); EXPECT_OK_AND_ASSIGN(auto reader_builder, file_format->CreateReaderBuilder(batch_size)); EXPECT_OK_AND_ASSIGN( - std::unique_ptr reader, - PrefetchFileBatchReader::Create( + std::unique_ptr reader, + PrefetchFileBatchReaderImpl::Create( PathUtil::JoinPath(dir_->Str(), "file." + file_format->Identifier()), reader_builder.get(), local_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, @@ -130,9 +132,9 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, return reader; } - bool HasValue( - const std::vector>>& - prefetch_queues) { + bool HasValue(const std::vector< + std::unique_ptr>>& + prefetch_queues) { for (const auto& queue : prefetch_queues) { if (!queue->empty()) { return true; @@ -166,19 +168,19 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, }; std::vector GetTestValues() { - return {"parquet"}; + return {"parquet", "orc"}; } -INSTANTIATE_TEST_SUITE_P(FileFormat, PrefetchFileBatchReaderTest, +INSTANTIATE_TEST_SUITE_P(FileFormat, PrefetchFileBatchReaderImplTest, ::testing::ValuesIn(GetTestValues())); -TEST_F(PrefetchFileBatchReaderTest, TestSimple) { +TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) { auto data_array = PrepareArray(101); int32_t batch_size = 10; for (auto prefetch_max_parallel_num : {1, 2, 3, 5, 8, 10}) { MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( - auto reader, PrefetchFileBatchReader::Create( + auto reader, PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, @@ -193,7 +195,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestSimple) { } } -TEST_F(PrefetchFileBatchReaderTest, TestReadWithLimits) { +TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLimits) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 12; @@ -201,7 +203,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithLimits) { MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -221,7 +223,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithLimits) { ASSERT_TRUE(read_metrics); } -TEST_F(PrefetchFileBatchReaderTest, TestReadWithoutInitializeReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithoutInitializeReadRanges) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 12; @@ -229,7 +231,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithoutInitializeReadRanges) { MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/false)); @@ -239,24 +241,24 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithoutInitializeReadRanges) { reader->Close(); } -TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithoutBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithoutBitmap) { std::vector> read_ranges = { {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}}; - auto filtered_ranges = PrefetchFileBatchReader::FilterReadRanges(read_ranges, std::nullopt); + auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, std::nullopt); ASSERT_EQ(filtered_ranges, read_ranges); } -TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithAllZeroBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithAllZeroBitmap) { std::vector> read_ranges = { {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}}; auto bitmap = RoaringBitmap32::From({}); - auto filtered_ranges = PrefetchFileBatchReader::FilterReadRanges(read_ranges, bitmap); + auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap); ASSERT_TRUE(filtered_ranges.empty()); } -TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithBitmap) { auto data_array = PrepareArray(10000); std::set valid_row_ids; for (int32_t i = 1000; i < 2000; i++) { @@ -270,25 +272,25 @@ TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithBitmap) { std::vector> read_ranges = { {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}}; - auto filtered_ranges = PrefetchFileBatchReader::FilterReadRanges(read_ranges, bitmap); + auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap); std::vector> expected_filtered_ranges = { {1000, 2000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}}; ASSERT_EQ(expected_filtered_ranges, filtered_ranges); } -TEST_F(PrefetchFileBatchReaderTest, DispatchReadRangesEmpty) { +TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRangesEmpty) { std::vector> read_ranges; - auto read_ranges_in_group = PrefetchFileBatchReader::DispatchReadRanges(read_ranges, 3); + auto read_ranges_in_group = PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3); ASSERT_EQ(read_ranges_in_group.size(), 3); ASSERT_TRUE(read_ranges_in_group[0].empty()); ASSERT_TRUE(read_ranges_in_group[1].empty()); ASSERT_TRUE(read_ranges_in_group[2].empty()); } -TEST_F(PrefetchFileBatchReaderTest, DispatchReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRanges) { std::vector> read_ranges = { {0, 10000}, {10000, 20000}, {20000, 30000}, {30000, 40000}}; - auto read_ranges_in_group = PrefetchFileBatchReader::DispatchReadRanges(read_ranges, 3); + auto read_ranges_in_group = PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3); std::vector> expected_group_0 = {{0, 10000}, {30000, 40000}}; ASSERT_EQ(read_ranges_in_group[0], expected_group_0); std::vector> expected_group_1 = {{10000, 20000}}; @@ -297,18 +299,18 @@ TEST_F(PrefetchFileBatchReaderTest, DispatchReadRanges) { ASSERT_EQ(read_ranges_in_group[2], expected_group_2); } -TEST_F(PrefetchFileBatchReaderTest, RefreshReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) { auto data_array = PrepareArray(101); int32_t batch_size = 30; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/false)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); ASSERT_OK(prefetch_reader->RefreshReadRanges()); std::vector> read_ranges_0 = {{0, 30}, {90, 101}}; auto mock_reader_0 = dynamic_cast(prefetch_reader->readers_[0].get()); @@ -321,18 +323,18 @@ TEST_F(PrefetchFileBatchReaderTest, RefreshReadRanges) { ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2); } -TEST_F(PrefetchFileBatchReaderTest, SetReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) { auto data_array = PrepareArray(400); int32_t batch_size = 30; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/false)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); ASSERT_FALSE(prefetch_reader->need_prefetch_); prefetch_reader->need_prefetch_ = true; std::vector> ranges = { @@ -357,14 +359,14 @@ TEST_F(PrefetchFileBatchReaderTest, SetReadRanges) { ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2); } -TEST_F(PrefetchFileBatchReaderTest, TestReadWithLargeBatchSize) { +TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) { auto data_array = PrepareArray(101); int32_t batch_size = 150; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -377,18 +379,18 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithLargeBatchSize) { ASSERT_TRUE(result_array->Equals(expected_array)); } -TEST_F(PrefetchFileBatchReaderTest, TestPartialReaderSuccessRead) { +TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); for (int32_t i = 0; i < prefetch_max_parallel_num; i++) { dynamic_cast(prefetch_reader->readers_[i].get()) ->EnableRandomizeBatchSize(false); @@ -421,19 +423,19 @@ TEST_F(PrefetchFileBatchReaderTest, TestPartialReaderSuccessRead) { ReaderUtils::ReleaseReadBatch(std::move(batch_with_bitmap.first)); } -TEST_F(PrefetchFileBatchReaderTest, TestAllReaderFailedWithIOError) { +TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); for (int32_t i = 0; i < prefetch_max_parallel_num; i++) { dynamic_cast(prefetch_reader->readers_[i].get()) ->SetNextBatchStatus(Status::IOError("mock error")); @@ -455,14 +457,14 @@ TEST_F(PrefetchFileBatchReaderTest, TestAllReaderFailedWithIOError) { ASSERT_TRUE(batch_result2.status().IsIOError()); } -TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithEmptyData) { +TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) { auto data_array = PrepareArray(0); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -474,14 +476,14 @@ TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithEmptyData) { ASSERT_FALSE(result_array); } -TEST_F(PrefetchFileBatchReaderTest, TestCallNextBatchAfterReadingEof) { +TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) { auto data_array = PrepareArray(10); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 6; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -498,54 +500,54 @@ TEST_F(PrefetchFileBatchReaderTest, TestCallNextBatchAfterReadingEof) { ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap)); } -TEST_F(PrefetchFileBatchReaderTest, TestCreateReaderWithoutNextBatch) { +TEST_F(PrefetchFileBatchReaderImplTest, TestCreateReaderWithoutNextBatch) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); } -TEST_F(PrefetchFileBatchReaderTest, TestInvalidCase) { +TEST_F(PrefetchFileBatchReaderImplTest, TestInvalidCase) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; std::string data_file_path = ""; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); { - ASSERT_NOK(PrefetchFileBatchReader::Create(data_file_path, &reader_builder, mock_fs_, - /*prefetch_max_parallel_num=*/0, batch_size, 2, - /*enable_adaptive_prefetch_strategy=*/false, - executor_, - /*initialize_read_ranges=*/true)); + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( + data_file_path, &reader_builder, mock_fs_, + /*prefetch_max_parallel_num=*/0, batch_size, 2, + /*enable_adaptive_prefetch_strategy=*/false, executor_, + /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, /*batch_size=*/-1, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, /*executor=*/nullptr, /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, /*reader_builder=*/nullptr, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, /*fs=*/nullptr, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, @@ -553,7 +555,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestInvalidCase) { } { ASSERT_OK_AND_ASSIGN( - auto reader, PrefetchFileBatchReader::Create( + auto reader, PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, @@ -566,7 +568,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestInvalidCase) { /// There are three stripes: [0,30), [30,60), [60,90). After predicate pushdown, the stripe /// [30,60) will be filtered out. /// The read range is [0,30), [30,60), [60,90). So, expected results is [0,30), [60,90) -TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithPredicatePushdownWithCompleteFiltering) { +TEST_P(PrefetchFileBatchReaderImplTest, TestPrefetchWithPredicatePushdownWithCompleteFiltering) { auto file_format = GetParam(); auto data_array = PrepareArray(90); PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/30); @@ -598,7 +600,8 @@ TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithPredicatePushdownWithComplet /// There are three stripes: [0,30), [30,60), [60,90). Each stripe has 3 row groups. /// After predicate pushdown, the row group [0, 20), [70, 90) will be remained. /// The read range is [0,30), [30,60), [60,90). -TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithOrcPredicatePushdownWithRowGroupGranularity) { +TEST_P(PrefetchFileBatchReaderImplTest, + TestPrefetchWithOrcPredicatePushdownWithRowGroupGranularity) { auto file_format = GetParam(); auto data_array = PrepareArray(90); PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/10); @@ -629,7 +632,7 @@ TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithOrcPredicatePushdownWithRowG ASSERT_TRUE(CheckEqual(expected_array, result_array)); } -TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithBitmap) { auto data_array = PrepareArray(10000); std::set valid_row_ids; for (int32_t i = 0; i < 5120; i++) { @@ -640,7 +643,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithBitmap) { MockFormatReaderBuilder reader_builder(data_array, data_type_, bitmap, /*read_batch_size=*/100); int32_t prefetch_max_parallel_num = 3; - ASSERT_OK_AND_ASSIGN(auto reader, PrefetchFileBatchReader::Create( + ASSERT_OK_AND_ASSIGN(auto reader, PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, /*batch_size=*/100, prefetch_max_parallel_num * 2, diff --git a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp index ef050711..c2db0745 100644 --- a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp +++ b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp @@ -23,7 +23,7 @@ #include "arrow/array/array_nested.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/executor.h" #include "paimon/testing/mock/mock_file_batch_reader.h" #include "paimon/testing/mock/mock_file_system.h" @@ -78,7 +78,7 @@ class ApplyDeletionVectorBatchReaderTest : public ::testing::Test, if (enable_prefetch) { MockFormatReaderBuilder reader_builder(data, target_type_, batch_size); ASSERT_OK_AND_ASSIGN(file_batch_reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"DUMMY", &reader_builder, fs_, prefetch_batch_count, batch_size, prefetch_batch_count * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.h b/src/paimon/core/io/complete_row_tracking_fields_reader.h index 9fb986a8..eed2f4a9 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.h +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.h @@ -60,10 +60,6 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader { reader_->Close(); } - Status SeekToRow(uint64_t row_number) override { - return Status::Invalid("CompleteRowTrackingFieldsBatchReader does not support SeekToRow"); - } - uint64_t GetPreviousBatchFirstRowNumber() const override { return reader_->GetPreviousBatchFirstRowNumber(); } @@ -72,19 +68,6 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader { return reader_->GetNumberOfRows(); } - uint64_t GetNextRowToRead() const override { - return reader_->GetNextRowToRead(); - } - - Result>> GenReadRanges( - bool* need_prefetch) const override { - return Status::Invalid("CompleteRowTrackingFieldsBatchReader do not support GenReadRanges"); - } - - Status SetReadRanges(const std::vector>& read_ranges) override { - return Status::Invalid("CompleteRowTrackingFieldsBatchReader do not support SetReadRanges"); - } - bool SupportPreciseBitmapSelection() const override { return reader_->SupportPreciseBitmapSelection(); } diff --git a/src/paimon/core/operation/abstract_split_read.cpp b/src/paimon/core/operation/abstract_split_read.cpp index 504011f2..f9dfcbb8 100644 --- a/src/paimon/core/operation/abstract_split_read.cpp +++ b/src/paimon/core/operation/abstract_split_read.cpp @@ -23,7 +23,7 @@ #include "arrow/type.h" #include "paimon/common/reader/delegating_prefetch_reader.h" #include "paimon/common/reader/predicate_batch_reader.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/object_utils.h" @@ -147,11 +147,9 @@ Result> AbstractSplitRead::CreateFileBatchReade // lance do not support stream build with input stream return reader_builder->Build(data_file_path); } - // TODO(zhanyu.fyh): orc format support prefetch - if (context_->EnablePrefetch() && file_format_identifier != "blob" && - file_format_identifier != "orc") { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr prefetch_reader, - PrefetchFileBatchReader::Create( + if (context_->EnablePrefetch() && file_format_identifier != "blob") { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr prefetch_reader, + PrefetchFileBatchReaderImpl::Create( data_file_path, reader_builder, options_.GetFileSystem(), context_->GetPrefetchMaxParallelNum(), options_.GetReadBatchSize(), context_->GetPrefetchBatchCount(), diff --git a/src/paimon/format/avro/avro_file_batch_reader.h b/src/paimon/format/avro/avro_file_batch_reader.h index 5874910b..08645b92 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.h +++ b/src/paimon/format/avro/avro_file_batch_reader.h @@ -43,11 +43,6 @@ class AvroFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SeekToRow(uint64_t row_number) override { - assert(false); - return Status::NotImplemented("not implemented"); - } - uint64_t GetPreviousBatchFirstRowNumber() const override { assert(false); return -1; @@ -58,27 +53,11 @@ class AvroFileBatchReader : public FileBatchReader { return -1; } - uint64_t GetNextRowToRead() const override { - assert(false); - return -1; - } - std::shared_ptr GetReaderMetrics() const override { assert(false); return nullptr; } - Result>> GenReadRanges( - bool* need_prefetch) const override { - assert(false); - return Status::NotImplemented("not implemented"); - } - - Status SetReadRanges(const std::vector>& read_ranges) override { - assert(false); - return Status::NotImplemented("not implemented"); - } - void Close() override { DoClose(); } diff --git a/src/paimon/format/blob/blob_file_batch_reader.cpp b/src/paimon/format/blob/blob_file_batch_reader.cpp index 9e3ac39d..b1e8b672 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader.cpp @@ -278,11 +278,6 @@ Result> BlobFileBatchReader::ToArrowArray( return array; } -Status BlobFileBatchReader::SeekToRow(uint64_t row_number) { - assert(false); - return Status::NotImplemented("blob file batch reader seek to row is not supported."); -} - int32_t BlobFileBatchReader::GetIndexLength(const int8_t* bytes, int32_t offset) { return (bytes[offset + 3] << 24) | ((bytes[offset + 2] & 0xff) << 16) | ((bytes[offset + 1] & 0xff) << 8) | (bytes[offset] & 0xff); diff --git a/src/paimon/format/blob/blob_file_batch_reader.h b/src/paimon/format/blob/blob_file_batch_reader.h index a4ad6981..29ecf68e 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.h +++ b/src/paimon/format/blob/blob_file_batch_reader.h @@ -96,8 +96,6 @@ class BlobFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SeekToRow(uint64_t row_number) override; - Result NextBatch() override; uint64_t GetPreviousBatchFirstRowNumber() const override { @@ -108,26 +106,10 @@ class BlobFileBatchReader : public FileBatchReader { return all_blob_lengths_.size(); } - uint64_t GetNextRowToRead() const override { - if (current_pos_ < target_blob_row_indexes_.size()) { - return target_blob_row_indexes_[current_pos_]; - } - return GetNumberOfRows(); - } - std::shared_ptr GetReaderMetrics() const override { return metrics_; } - Status SetReadRanges(const std::vector>& read_ranges) override { - return Status::NotImplemented("set read ranges not implemented"); - } - - Result>> GenReadRanges( - bool* need_prefetch) const override { - return Status::NotImplemented("gen read ranges not implemented"); - } - void Close() override { closed_ = true; } diff --git a/src/paimon/format/blob/blob_file_batch_reader_test.cpp b/src/paimon/format/blob/blob_file_batch_reader_test.cpp index a45ac930..d827c83e 100644 --- a/src/paimon/format/blob/blob_file_batch_reader_test.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader_test.cpp @@ -169,20 +169,16 @@ TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_EQ(3, reader->GetNumberOfRows()); ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(0, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(1, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(2, reader->GetNextRowToRead()); ArrowArrayRelease(batch2.first.get()); ArrowSchemaRelease(batch2.second.get()); ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(3, reader->GetNextRowToRead()); ArrowArrayRelease(batch3.first.get()); ArrowSchemaRelease(batch3.second.get()); ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); @@ -212,10 +208,8 @@ TEST_F(BlobFileBatchReaderTest, TestRowNumbersWithBitmap) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, roaring)); ASSERT_EQ(3, reader->GetNumberOfRows()); ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(1, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(3, reader->GetNextRowToRead()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); @@ -254,7 +248,6 @@ TEST_F(BlobFileBatchReaderTest, InvalidScenario) { /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); ASSERT_NOK_WITH_MSG(reader->GetFileSchema(), "blob file has no self-describing file schema"); - ASSERT_NOK_WITH_MSG(reader->GenReadRanges({}), "gen read ranges not implemented"); ASSERT_TRUE(reader->GetReaderMetrics()); ASSERT_NOK_WITH_MSG(reader->NextBatch(), "target type is nullptr, call SetReadSchema first"); @@ -292,7 +285,6 @@ TEST_P(BlobFileBatchReaderTest, EmptyFile) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_EQ(0, reader->GetNumberOfRows()); ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(0, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); ASSERT_TRUE(BatchReader::IsEofBatch(batch)); } diff --git a/src/paimon/format/lance/lance_file_batch_reader.h b/src/paimon/format/lance/lance_file_batch_reader.h index 4667e4b5..ab8019ac 100644 --- a/src/paimon/format/lance/lance_file_batch_reader.h +++ b/src/paimon/format/lance/lance_file_batch_reader.h @@ -39,10 +39,6 @@ class LanceFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SeekToRow(uint64_t row_number) override { - return Status::Invalid("do not support seek to specific row in lance format"); - } - Result NextBatch() override; uint64_t GetPreviousBatchFirstRowNumber() const override { @@ -55,25 +51,11 @@ class LanceFileBatchReader : public FileBatchReader { return num_rows_; } - uint64_t GetNextRowToRead() const override { - assert(false); - return -1; - } - std::shared_ptr GetReaderMetrics() const override { // TODO(xinyu.lxy): support metrics in reader return metrics_; } - Result>> GenReadRanges( - bool* need_prefetch) const override { - return Status::Invalid("do not support generating read ranges in lance format"); - } - - Status SetReadRanges(const std::vector>& read_ranges) override { - return Status::Invalid("do not support setting read ranges in lance format"); - } - void Close() override { return DoClose(); } diff --git a/src/paimon/format/orc/CMakeLists.txt b/src/paimon/format/orc/CMakeLists.txt index 91319a1e..8d670523 100644 --- a/src/paimon/format/orc/CMakeLists.txt +++ b/src/paimon/format/orc/CMakeLists.txt @@ -21,8 +21,10 @@ if(PAIMON_ENABLE_ORC) orc_input_stream_impl.cpp orc_output_stream_impl.cpp orc_adapter.cpp + orc_reader_wrapper.cpp orc_stats_extractor.cpp - orc_format_writer.cpp) + orc_format_writer.cpp + read_range_generator.cpp) add_paimon_lib(paimon_orc_file_format SOURCES @@ -56,6 +58,8 @@ if(PAIMON_ENABLE_ORC) orc_stats_extractor_test.cpp orc_format_writer_test.cpp orc_file_batch_reader_test.cpp + orc_reader_wrapper_test.cpp + read_range_generator_test.cpp EXTRA_INCLUDES ${ORC_INCLUDE_DIR} STATIC_LINK_LIBS diff --git a/src/paimon/format/orc/orc_file_batch_reader.cpp b/src/paimon/format/orc/orc_file_batch_reader.cpp index d0907ada..f3fba769 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader.cpp @@ -38,17 +38,16 @@ #include "paimon/format/orc/orc_memory_pool.h" #include "paimon/format/orc/orc_metrics.h" #include "paimon/format/orc/predicate_converter.h" + namespace paimon::orc { -OrcFileBatchReader::OrcFileBatchReader(const std::string& file_name, int32_t batch_size, - std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, - std::unique_ptr<::orc::Reader>&& reader, + +OrcFileBatchReader::OrcFileBatchReader(std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, + std::unique_ptr&& reader, const std::map& options, - std::unique_ptr&& arrow_pool, + const std::shared_ptr& arrow_pool, const std::shared_ptr<::orc::MemoryPool>& orc_pool) - : file_name_(file_name), - batch_size_(batch_size), - options_(options), - arrow_pool_(std::move(arrow_pool)), + : options_(options), + arrow_pool_(arrow_pool), orc_pool_(orc_pool), reader_metrics_(std::move(reader_metrics)), reader_(std::move(reader)), @@ -64,7 +63,9 @@ Result> OrcFileBatchReader::Create( if (pool == nullptr) { return Status::Invalid("memory pool is nullptr"); } + uint64_t natural_read_size = input_stream->getNaturalReadSize(); auto orc_pool = std::make_shared(pool); + std::shared_ptr arrow_pool = GetArrowPool(pool); reader_options.setMemoryPool(*orc_pool); std::unique_ptr<::orc::ReaderMetrics> reader_metrics; @@ -81,9 +82,13 @@ Result> OrcFileBatchReader::Create( } std::unique_ptr<::orc::Reader> reader = ::orc::createReader(std::move(input_stream), reader_options); - auto orc_file_batch_reader = std::unique_ptr( - new OrcFileBatchReader(file_name, batch_size, std::move(reader_metrics), - std::move(reader), options, GetArrowPool(pool), orc_pool)); + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr reader_wrapper, + OrcReaderWrapper::Create(std::move(reader), file_name, batch_size, natural_read_size, + options, arrow_pool, orc_pool)); + auto orc_file_batch_reader = std::unique_ptr(new OrcFileBatchReader( + std::move(reader_metrics), std::move(reader_wrapper), options, arrow_pool, orc_pool)); PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowSchema> file_schema, orc_file_batch_reader->GetFileSchema()); PAIMON_RETURN_NOT_OK(orc_file_batch_reader->SetReadSchema( @@ -100,7 +105,7 @@ Result> OrcFileBatchReader::Create( Result> OrcFileBatchReader::GetFileSchema() const { assert(reader_); - const auto& orc_file_type = reader_->getType(); + const auto& orc_file_type = reader_->GetOrcType(); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr arrow_file_type, OrcAdapter::GetArrowType(&orc_file_type)); auto c_schema = std::make_unique<::ArrowSchema>(); @@ -130,72 +135,26 @@ Status OrcFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, } } PAIMON_ASSIGN_OR_RAISE(auto orc_target_type, OrcAdapter::GetOrcType(*arrow_schema)); - const auto& orc_src_type = reader_->getType(); + const auto& orc_src_type = reader_->GetOrcType(); PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::orc::SearchArgument> search_arg, PredicateConverter::Convert(orc_src_type, predicate)); - target_type_ = arrow::struct_(arrow_schema->fields()); - PAIMON_ASSIGN_OR_RAISE(::orc::RowReaderOptions row_reader_options, - CreateRowReaderOptions(&orc_src_type, orc_target_type.get(), - std::move(search_arg), options_)); - try { - row_reader_ = reader_->createRowReader(row_reader_options); - } catch (const std::exception& e) { - return Status::Invalid( - fmt::format("orc file batch reader create row reader failed for file {}, with {} error", - file_name_, e.what())); - } catch (...) { - return Status::UnknownError(fmt::format( - "orc file batch reader create row reader failed for file {}, with unknown error", - file_name_)); - } - return Status::OK(); + auto target_type = arrow::struct_(arrow_schema->fields()); + std::vector target_column_ids; + PAIMON_ASSIGN_OR_RAISE( + ::orc::RowReaderOptions row_reader_options, + CreateRowReaderOptions(&orc_src_type, orc_target_type.get(), std::move(search_arg), + options_, &target_column_ids)); + + target_column_ids_ = target_column_ids; + return reader_->SetReadSchema(target_type, row_reader_options); } Status OrcFileBatchReader::SeekToRow(uint64_t row_number) { - try { - row_reader_->seekToRow(row_number); - } catch (const std::exception& e) { - return Status::Invalid( - fmt::format("orc file batch reader seek to row {} failed for file {}, with {} error", - row_number, file_name_, e.what())); - } catch (...) { - return Status::UnknownError(fmt::format( - "orc file batch reader seek to row {} failed for file {}, with unknown error", - row_number, file_name_)); - } - return Status::OK(); + return reader_->SeekToRow(row_number); } Result OrcFileBatchReader::NextBatch() { - if (has_error_) { - return Status::Invalid(fmt::format( - "Since an error has occurred, next batch has been prohibited. file '{}'", file_name_)); - } - std::unique_ptr c_array = std::make_unique(); - std::unique_ptr c_schema = std::make_unique(); - try { - auto orc_batch = row_reader_->createRowBatch(batch_size_); - bool eof = !row_reader_->next(*orc_batch); - if (eof) { - return BatchReader::MakeEofBatch(); - } - ScopeGuard guard([this]() { has_error_ = true; }); - assert(orc_batch->numElements > 0); - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr array, - OrcAdapter::AppendBatch(target_type_, orc_batch.get(), arrow_pool_.get())); - PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); - guard.Release(); - } catch (const std::exception& e) { - return Status::Invalid( - fmt::format("orc file batch reader get next batch failed for file {}, with {} error", - file_name_, e.what())); - } catch (...) { - return Status::UnknownError(fmt::format( - "orc file batch reader get next batch failed for file {}, with unknown error", - file_name_)); - } - return make_pair(std::move(c_array), std::move(c_schema)); + return reader_->Next(); } std::shared_ptr OrcFileBatchReader::GetReaderMetrics() const { @@ -207,11 +166,9 @@ std::shared_ptr OrcFileBatchReader::GetReaderMetrics() const { return metrics_; } -Result<::orc::RowReaderOptions> OrcFileBatchReader::CreateRowReaderOptions( +Result> OrcFileBatchReader::GetAndCheckIncludedFields( const ::orc::Type* src_type, const ::orc::Type* target_type, - std::unique_ptr<::orc::SearchArgument>&& search_arg, - const std::map& options) { - ::orc::RowReaderOptions row_reader_options; + std::vector* target_column_ids) { std::list include_fields; std::unordered_map src_type_map; for (uint64_t i = 0; i < src_type->getSubtypeCount(); i++) { @@ -232,14 +189,32 @@ Result<::orc::RowReaderOptions> OrcFileBatchReader::CreateRowReaderOptions( target_type->toString(), src_type->toString(), field_name)); } int64_t target_field_col_id = iter->second->getColumnId(); + GetSubColumnIds(iter->second, target_column_ids); if (prev_target_field_col_id >= target_field_col_id) { return Status::Invalid( - "The column id of the target field should be monotonically increasing in format " - "reader"); + "The column id of the target field should be monotonically increasing in " + "format reader"); } prev_target_field_col_id = target_field_col_id; include_fields.push_back(field_name); } + return include_fields; +} + +void OrcFileBatchReader::GetSubColumnIds(const ::orc::Type* type, std::vector* col_ids) { + col_ids->push_back(type->getColumnId()); + for (uint64_t i = 0; i < type->getSubtypeCount(); i++) { + GetSubColumnIds(type->getSubtype(i), col_ids); + } +} + +Result<::orc::RowReaderOptions> OrcFileBatchReader::CreateRowReaderOptions( + const ::orc::Type* src_type, const ::orc::Type* target_type, + std::unique_ptr<::orc::SearchArgument>&& search_arg, + const std::map& options, std::vector* target_column_ids) { + PAIMON_ASSIGN_OR_RAISE(std::list include_fields, + GetAndCheckIncludedFields(src_type, target_type, target_column_ids)); + ::orc::RowReaderOptions row_reader_options; row_reader_options.include(include_fields); row_reader_options.searchArgument(std::move(search_arg)); diff --git a/src/paimon/format/orc/orc_file_batch_reader.h b/src/paimon/format/orc/orc_file_batch_reader.h index 0201d02b..286f9e13 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.h +++ b/src/paimon/format/orc/orc_file_batch_reader.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -27,16 +28,18 @@ #include "arrow/type.h" #include "orc/OrcFile.hh" #include "orc/Reader.hh" +#include "paimon/format/orc/orc_reader_wrapper.h" #include "paimon/memory/memory_pool.h" #include "paimon/predicate/predicate.h" -#include "paimon/reader/file_batch_reader.h" +#include "paimon/reader/prefetch_file_batch_reader.h" namespace orc { class InputStream; } // namespace orc namespace paimon::orc { -class OrcFileBatchReader : public FileBatchReader { + +class OrcFileBatchReader : public PrefetchFileBatchReader { public: static Result> Create( std::unique_ptr<::orc::InputStream>&& input_stream, const std::shared_ptr& pool, @@ -51,8 +54,7 @@ class OrcFileBatchReader : public FileBatchReader { Status SeekToRow(uint64_t row_number) override; Status SetReadRanges(const std::vector>& read_ranges) override { - assert(false); - return Status::NotImplemented("set read ranges not implemented"); + return reader_->SetReadRanges(read_ranges); } // Important: output ArrowArray is allocated on arrow_pool_ whose lifecycle holds in @@ -60,29 +62,26 @@ class OrcFileBatchReader : public FileBatchReader { Result NextBatch() override; uint64_t GetPreviousBatchFirstRowNumber() const override { - return row_reader_->getRowNumber(); + return reader_->GetRowNumber(); } uint64_t GetNumberOfRows() const override { - return reader_->getNumberOfRows(); + return reader_->GetNumberOfRows(); } uint64_t GetNextRowToRead() const override { - assert(false); - return -1; + return reader_->GetNextRowToRead(); } std::shared_ptr GetReaderMetrics() const override; Result>> GenReadRanges( bool* need_prefetch) const override { - assert(false); - return Status::NotImplemented("gen read ranges not implemented"); + return reader_->GenReadRanges(target_column_ids_, 0, GetNumberOfRows(), need_prefetch); } void Close() override { metrics_ = GetReaderMetrics(); - row_reader_.reset(); reader_.reset(); reader_metrics_.reset(); } @@ -92,29 +91,33 @@ class OrcFileBatchReader : public FileBatchReader { } private: - OrcFileBatchReader(const std::string& file_name, int32_t batch_size, - std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, - std::unique_ptr<::orc::Reader>&& reader, + OrcFileBatchReader(std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, + std::unique_ptr&& reader, const std::map& options, - std::unique_ptr&& arrow_pool, + const std::shared_ptr& arrow_pool, const std::shared_ptr<::orc::MemoryPool>& orc_pool); + static void GetSubColumnIds(const ::orc::Type* type, std::vector* col_ids); + static Result<::orc::RowReaderOptions> CreateRowReaderOptions( const ::orc::Type* src_type, const ::orc::Type* target_type, std::unique_ptr<::orc::SearchArgument>&& search_arg, - const std::map& options); + const std::map& options, + std::vector* target_column_ids); + + static Result> GetAndCheckIncludedFields( + const ::orc::Type* src_type, const ::orc::Type* target_type, + std::vector* target_column_ids); - private: - std::string file_name_; - int32_t batch_size_; std::map options_; - std::unique_ptr arrow_pool_; + + std::shared_ptr arrow_pool_; std::shared_ptr<::orc::MemoryPool> orc_pool_; + std::unique_ptr<::orc::ReaderMetrics> reader_metrics_; - std::unique_ptr<::orc::Reader> reader_; - std::unique_ptr<::orc::RowReader> row_reader_; - std::shared_ptr target_type_; + std::unique_ptr reader_; std::shared_ptr metrics_; - bool has_error_ = false; + std::vector target_column_ids_; }; + } // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_file_batch_reader_test.cpp b/src/paimon/format/orc/orc_file_batch_reader_test.cpp index 0308dc04..47c1a8e0 100644 --- a/src/paimon/format/orc/orc_file_batch_reader_test.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader_test.cpp @@ -256,9 +256,11 @@ TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { std::string orc_schema = "struct"; std::unique_ptr<::orc::Type> src_type = ::orc::Type::buildTypeFromString(orc_schema); std::unique_ptr<::orc::Type> target_type = ::orc::Type::buildTypeFromString(orc_schema); - ASSERT_OK_AND_ASSIGN(auto row_reader_option, OrcFileBatchReader::CreateRowReaderOptions( - src_type.get(), target_type.get(), - /*search_arg=*/nullptr, options)); + std::vector target_column_ids; + ASSERT_OK_AND_ASSIGN(auto row_reader_option, + OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids)); ASSERT_EQ(std::list({"col1", "col2", "col3"}), row_reader_option.getIncludeNames()); ASSERT_EQ(row_reader_option.getEnableLazyDecoding(), false); @@ -271,9 +273,11 @@ TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { std::string target_orc_schema = "struct"; std::unique_ptr<::orc::Type> target_type = ::orc::Type::buildTypeFromString(target_orc_schema); - ASSERT_OK_AND_ASSIGN(auto row_reader_option, OrcFileBatchReader::CreateRowReaderOptions( - src_type.get(), target_type.get(), - /*search_arg=*/nullptr, options)); + std::vector target_column_ids; + ASSERT_OK_AND_ASSIGN(auto row_reader_option, + OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids)); ASSERT_EQ(std::list({"col1", "col3"}), row_reader_option.getIncludeNames()); ASSERT_EQ(row_reader_option.getEnableLazyDecoding(), true); } @@ -285,9 +289,11 @@ TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { std::string target_orc_schema = "struct"; std::unique_ptr<::orc::Type> target_type = ::orc::Type::buildTypeFromString(target_orc_schema); + std::vector target_column_ids; ASSERT_NOK_WITH_MSG( OrcFileBatchReader::CreateRowReaderOptions(src_type.get(), target_type.get(), - /*search_arg=*/nullptr, options), + /*search_arg=*/nullptr, options, + &target_column_ids), "The column id of the target field should be monotonically increasing in format " "reader"); } @@ -299,10 +305,11 @@ TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { std::string target_orc_schema = "struct"; std::unique_ptr<::orc::Type> target_type = ::orc::Type::buildTypeFromString(target_orc_schema); - ASSERT_NOK_WITH_MSG( - OrcFileBatchReader::CreateRowReaderOptions(src_type.get(), target_type.get(), - /*search_arg=*/nullptr, options), - "field non_exist_col not in file schema"); + std::vector target_column_ids; + ASSERT_NOK_WITH_MSG(OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids), + "field non_exist_col not in file schema"); } { std::map options; @@ -314,9 +321,11 @@ TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { "struct>,sub3:int>,col3:map>"; std::unique_ptr<::orc::Type> target_type = ::orc::Type::buildTypeFromString(target_orc_schema); - ASSERT_OK_AND_ASSIGN(auto row_reader_option, OrcFileBatchReader::CreateRowReaderOptions( - src_type.get(), target_type.get(), - /*search_arg=*/nullptr, options)); + std::vector target_column_ids; + ASSERT_OK_AND_ASSIGN(auto row_reader_option, + OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids)); ASSERT_EQ(std::list({"col1", "col3"}), row_reader_option.getIncludeNames()); } { @@ -330,9 +339,11 @@ TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { std::unique_ptr<::orc::Type> target_type = ::orc::Type::buildTypeFromString(target_orc_schema); + std::vector target_column_ids; ASSERT_NOK_WITH_MSG( OrcFileBatchReader::CreateRowReaderOptions(src_type.get(), target_type.get(), - /*search_arg=*/nullptr, options), + /*search_arg=*/nullptr, options, + &target_column_ids), "target_type " "struct,col2:double,col3:string> not match " "src_type struct,col2:double,col3:string>, " diff --git a/src/paimon/format/orc/orc_format_defs.h b/src/paimon/format/orc/orc_format_defs.h index fe4cdcb4..ad44ae49 100644 --- a/src/paimon/format/orc/orc_format_defs.h +++ b/src/paimon/format/orc/orc_format_defs.h @@ -46,4 +46,12 @@ static inline const char ORC_NATURAL_READ_SIZE[] = "orc.read.natural-read-size"; static constexpr uint64_t DEFAULT_NATURAL_READ_SIZE = 1024 * 1024; // default value of ORC_READ_ENABLE_METRICS is false static inline const char ORC_READ_ENABLE_METRICS[] = "orc.read.enable-metrics"; + +static constexpr uint64_t MIN_ROW_GROUP_COUNT_IN_ONE_NATURAL_READ = 1; +static inline const char ENABLE_PREFETCH_READ_SIZE_THRESHOLD[] = + "orc.read.enable-prefetch-read-size-threshold"; +// Prefetching will not be enabled if the total amount of data queried is below this threshold, as +// prefetching for very small data sets is not beneficial. +static constexpr uint64_t DEFAULT_ENABLE_PREFETCH_READ_SIZE_THRESHOLD = 10ull * 1024 * 1024; + } // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_reader_wrapper.cpp b/src/paimon/format/orc/orc_reader_wrapper.cpp new file mode 100644 index 00000000..5d0b3469 --- /dev/null +++ b/src/paimon/format/orc/orc_reader_wrapper.cpp @@ -0,0 +1,98 @@ +/* + * Copyright 2025-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_reader_wrapper.h" + +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "orc/OrcFile.hh" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" + +namespace paimon::orc { + +Status OrcReaderWrapper::SeekToRow(uint64_t row_number) { + try { + row_reader_->seekToRow(row_number); + next_row_ = row_number; + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc file batch reader seek to row {} failed for file {}, with {} error", + row_number, file_name_, e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "orc file batch reader seek to row {} failed for file {}, with unknown error", + row_number, file_name_)); + } + return Status::OK(); +} + +Status OrcReaderWrapper::SetReadSchema(const std::shared_ptr& target_type, + const ::orc::RowReaderOptions& row_reader_options) { + try { + row_reader_ = reader_->createRowReader(row_reader_options); + target_type_ = target_type; + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc file batch reader create row reader failed for file {}, with {} error", + file_name_, e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "orc file batch reader create row reader failed for file {}, with unknown error", + file_name_)); + } + return Status::OK(); +} + +Result OrcReaderWrapper::Next() { + if (has_error_) { + return Status::Invalid(fmt::format( + "Since an error has occurred, next batch has been prohibited. file '{}'", file_name_)); + } + std::unique_ptr c_array = std::make_unique(); + std::unique_ptr c_schema = std::make_unique(); + try { + auto orc_batch = row_reader_->createRowBatch(batch_size_); + bool eof = !row_reader_->next(*orc_batch); + if (eof) { + return BatchReader::MakeEofBatch(); + } + ScopeGuard guard([this]() { has_error_ = true; }); + assert(orc_batch->numElements > 0); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr array, + OrcAdapter::AppendBatch(target_type_, orc_batch.get(), arrow_pool_.get())); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + next_row_ = GetRowNumber() + orc_batch->numElements; + guard.Release(); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc file batch reader get next batch failed for file {}, with {} error", + file_name_, e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "orc file batch reader get next batch failed for file {}, with unknown error", + file_name_)); + } + return make_pair(std::move(c_array), std::move(c_schema)); +} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_reader_wrapper.h b/src/paimon/format/orc/orc_reader_wrapper.h new file mode 100644 index 00000000..ac4872bb --- /dev/null +++ b/src/paimon/format/orc/orc_reader_wrapper.h @@ -0,0 +1,125 @@ +/* + * Copyright 2025-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/memory_pool.h" +#include "fmt/format.h" +#include "paimon/format/orc/orc_adapter.h" +#include "paimon/format/orc/read_range_generator.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/reader/batch_reader.h" + +namespace paimon::orc { + +// The OrcReaderWrapper is a decorator class designed to support GetNextRowToRead. +class OrcReaderWrapper { + public: + ~OrcReaderWrapper() { + row_reader_.reset(); + reader_.reset(); + } + + static Result> Create( + std::unique_ptr<::orc::Reader> reader, const std::string& file_name, int32_t batch_size, + uint64_t natural_read_size, const std::map& options, + const std::shared_ptr& arrow_pool, + const std::shared_ptr<::orc::MemoryPool>& orc_pool) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr range_generator, + ReadRangeGenerator::Create(reader.get(), natural_read_size, options)); + auto reader_wrapper = std::unique_ptr( + new OrcReaderWrapper(std::move(reader), std::move(range_generator), file_name, + batch_size, arrow_pool, orc_pool)); + return reader_wrapper; + } + + Status SeekToRow(uint64_t row_number); + Result Next(); + Status SetReadSchema(const std::shared_ptr& target_type, + const ::orc::RowReaderOptions& row_reader_options); + + uint64_t GetNextRowToRead() const { + return next_row_; + } + + uint64_t GetRowNumber() const { + return row_reader_->getRowNumber(); + } + + uint64_t GetNumberOfRows() const { + return reader_->getNumberOfRows(); + } + + Status SetReadRanges(const std::vector>& read_ranges) { + // Intentionally a no-op: SetReadRanges is a best-effort hint only. + return Status::OK(); + } + + const ::orc::Type& GetOrcType() const { + return reader_->getType(); + } + + Result>> GenReadRanges( + std::vector target_column_ids, uint64_t begin_row_num, uint64_t end_row_num, + bool* need_prefetch) const { + return range_generator_->GenReadRanges(target_column_ids, begin_row_num, end_row_num, + need_prefetch); + } + + private: + OrcReaderWrapper(std::unique_ptr<::orc::Reader> reader, + std::unique_ptr range_generator, + const std::string& file_name, int32_t batch_size, + const std::shared_ptr& arrow_pool, + const std::shared_ptr<::orc::MemoryPool>& orc_pool) + : reader_(std::move(reader)), + range_generator_(std::move(range_generator)), + file_name_(file_name), + batch_size_(batch_size), + arrow_pool_(arrow_pool), + orc_pool_(orc_pool) {} + + std::unique_ptr<::orc::Reader> reader_; + std::unique_ptr<::orc::RowReader> row_reader_; + + std::unique_ptr range_generator_; + + const std::string file_name_; + const int32_t batch_size_; + + std::shared_ptr arrow_pool_; + std::shared_ptr<::orc::MemoryPool> orc_pool_; + + std::shared_ptr target_type_; + + // The next absolute row index to read. + uint64_t next_row_ = 0; + bool has_error_ = false; +}; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_reader_wrapper_test.cpp b/src/paimon/format/orc/orc_reader_wrapper_test.cpp new file mode 100644 index 00000000..3eb8cedc --- /dev/null +++ b/src/paimon/format/orc/orc_reader_wrapper_test.cpp @@ -0,0 +1,95 @@ +/* + * Copyright 2025-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_reader_wrapper.h" + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "gtest/gtest.h" +#include "orc/OrcFile.hh" +#include "paimon/common/reader/reader_utils.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::orc::test { + +class OrcReaderWrapperTest : public ::testing::Test { + protected: + void SetUp() override {} + + void TearDown() override {} +}; + +TEST_F(OrcReaderWrapperTest, NextRowToRead) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string file_path = dir->Str() + "/file.orc"; + { + std::unique_ptr<::orc::OutputStream> outStream = ::orc::writeLocalFile(file_path); + ::orc::WriterOptions options; + std::unique_ptr<::orc::Type> schema = + ::orc::Type::buildTypeFromString("struct"); + std::unique_ptr<::orc::Writer> writer = createWriter(*schema, outStream.get(), options); + auto col_batch = writer->createRowBatch(3); + ::orc::StructVectorBatch* batch = dynamic_cast<::orc::StructVectorBatch*>(col_batch.get()); + auto* col1 = dynamic_cast<::orc::LongVectorBatch*>(batch->fields[0]); + auto* col2 = dynamic_cast<::orc::StringVectorBatch*>(batch->fields[1]); + batch->numElements = 3; + col1->numElements = 3; + col2->numElements = 3; + col1->data[0] = 1; + col1->data[1] = 2; + col1->data[2] = 3; + col2->data[0] = const_cast("a"); + col2->length[0] = 1; + col2->data[1] = const_cast("b"); + col2->length[1] = 1; + col2->data[2] = const_cast("c"); + col2->length[2] = 1; + writer->add(*batch); + writer->close(); + } + + ::orc::ReaderOptions reader_opts; + std::unique_ptr<::orc::Reader> reader = + ::orc::createReader(::orc::readLocalFile(file_path), reader_opts); + std::map options; + ASSERT_OK_AND_ASSIGN(auto wrapper, OrcReaderWrapper::Create( + /*reader=*/std::move(reader), + /*file_name=*/file_path, + /*batch_size=*/2, + /*natural_read_size=*/0, + /*options=*/options, + /*arrow_pool=*/GetArrowPool(GetDefaultPool()), + /*orc_pool=*/nullptr)); + auto data_types = + arrow::struct_({arrow::field("col1", arrow::int64()), arrow::field("col2", arrow::utf8())}); + ::orc::RowReaderOptions row_opts; + ASSERT_TRUE(wrapper->SetReadSchema(data_types, row_opts).ok()); + + ASSERT_OK_AND_ASSIGN(auto batch1, wrapper->Next()); + EXPECT_EQ(wrapper->GetNextRowToRead(), 2u); // batch_size=2 + ReaderUtils::ReleaseReadBatch(std::move(batch1)); + + ASSERT_OK_AND_ASSIGN(auto batch2, wrapper->Next()); + EXPECT_EQ(wrapper->GetNextRowToRead(), 3u); // only 1 row left + ReaderUtils::ReleaseReadBatch(std::move(batch2)); + + ASSERT_OK_AND_ASSIGN(auto batch3, wrapper->Next()); + EXPECT_EQ(wrapper->GetNextRowToRead(), 3u); + ReaderUtils::ReleaseReadBatch(std::move(batch3)); +} + +} // namespace paimon::orc::test diff --git a/src/paimon/format/orc/read_range_generator.cpp b/src/paimon/format/orc/read_range_generator.cpp new file mode 100644 index 00000000..22c7b1a4 --- /dev/null +++ b/src/paimon/format/orc/read_range_generator.cpp @@ -0,0 +1,228 @@ +/* + * Copyright 2025-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/read_range_generator.h" + +#include +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "orc/Common.hh" +#include "orc/Reader.hh" +#include "orc/orc-config.hh" +#include "paimon/common/utils/options_utils.h" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/status.h" + +namespace paimon::orc { + +Result> ReadRangeGenerator::Create( + const ::orc::Reader* reader, uint64_t natural_read_size, + const std::map& options) { + try { + if (reader == nullptr) { + return Status::Invalid("create read range generator failed, orc reader is nullptr."); + } + std::map> column_length_map; + uint64_t stripe_num = reader->getNumberOfStripes(); + std::vector> stripe_infos; + stripe_infos.reserve(stripe_num); + for (uint64_t i = 0; i < stripe_num; i++) { + stripe_infos.emplace_back(reader->getStripe(i)); + } + if (stripe_num > 0) { + const auto& stripe = stripe_infos[0].get(); + for (uint64_t s = 0; s < stripe->getNumberOfStreams(); ++s) { + std::unique_ptr<::orc::StreamInformation> stream = stripe->getStreamInformation(s); + uint64_t column_id = stream->getColumnId(); + uint64_t length = stream->getLength(); + column_length_map[column_id][stream->getKind()] = length; + } + } + return std::unique_ptr(new ReadRangeGenerator( + reader, natural_read_size, std::move(stripe_infos), column_length_map, options)); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("create read range generator failed, with {} error", e.what())); + } catch (...) { + return Status::UnknownError("create read range generator failed, with unknown error"); + } +} + +Result>> ReadRangeGenerator::GenReadRanges( + std::vector target_column_ids, uint64_t begin_row_num, uint64_t end_row_num, + bool* need_prefetch) const { + try { + *need_prefetch = false; + uint64_t stripe_num = reader_->getNumberOfStripes(); + if (stripe_num == 0) { + return std::vector>(); + } + ReadRangeGenerator::ReaderMeta reader_meta = GetReaderMeta(); + uint64_t suggest_row_count = SuggestRowCount(target_column_ids); + auto ranges = DoGenReadRanges(begin_row_num, end_row_num, suggest_row_count, reader_meta); + + uint64_t target_read_length = 0; + for (const auto& column_id : target_column_ids) { + target_read_length += CompressedLength(column_id, std::nullopt); + } + target_read_length *= reader_->getNumberOfStripes(); + PAIMON_ASSIGN_OR_RAISE( + uint64_t enable_prefetch_read_size_threshold, + OptionsUtils::GetValueFromMap(options_, ENABLE_PREFETCH_READ_SIZE_THRESHOLD, + DEFAULT_ENABLE_PREFETCH_READ_SIZE_THRESHOLD)); + if (target_read_length > enable_prefetch_read_size_threshold) { + *need_prefetch = true; + } + + return ranges; + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("gen read ranges failed, with {} error", e.what())); + } catch (...) { + return Status::UnknownError("gen read ranges failed, with unknown error"); + } +} + +std::vector> ReadRangeGenerator::DoGenReadRanges( + uint64_t begin_row_num, uint64_t end_row_num, uint32_t range_size, + const ReaderMeta& reader_meta) { + std::vector> read_ranges; + auto calculate_range_size = [range_size](uint64_t pos, uint64_t stripe_end) -> uint64_t { + uint64_t min_right_bound = std::min({pos + range_size, stripe_end}); + return min_right_bound - pos; + }; + + uint64_t current_row = begin_row_num; + int32_t next_range_size = range_size; + while (current_row < end_row_num) { + uint64_t curr_stripe_begin = 0; + for (auto stripe_row_count : reader_meta.rows_per_stripes) { + if (current_row >= curr_stripe_begin && + current_row < curr_stripe_begin + stripe_row_count) { + next_range_size = + calculate_range_size(current_row - curr_stripe_begin, stripe_row_count); + break; + } else { + curr_stripe_begin += stripe_row_count; + } + } + read_ranges.emplace_back(current_row, std::min(current_row + next_range_size, end_row_num)); + current_row += next_range_size; + } + return read_ranges; +} + +uint64_t ReadRangeGenerator::SuggestRowCount(const std::vector& target_column_ids) const { + double bytes_per_group = BytesPerGroup(target_column_ids); + uint64_t expect_row_group_count = + std::ceil(static_cast(natural_read_size_) / bytes_per_group); + expect_row_group_count = + std::max(expect_row_group_count, MIN_ROW_GROUP_COUNT_IN_ONE_NATURAL_READ); + return std::min(expect_row_group_count * reader_->getRowIndexStride(), MaxRowCountInStripe()); +} + +double ReadRangeGenerator::BytesPerGroup(const std::vector& column_ids) const { + if (reader_->getNumberOfRows() == 0) { + return 1; + } + uint64_t max_column_id = std::numeric_limits::max(); + uint64_t max_length = 0; + for (const auto& column_id : column_ids) { + uint64_t length = CompressedLength(column_id, std::nullopt); + if (length > max_length) { + max_length = length; + max_column_id = column_id; + } + } + if (max_column_id == std::numeric_limits::max()) { + return 1; + } + uint64_t stripe_row_count = stripe_infos_[0]->getNumberOfRows(); + if (stripe_row_count == 0) { + return 1; + } + double avg_len_per_row = static_cast(max_length) / stripe_row_count; + double bytes_per_group = avg_len_per_row * reader_->getRowIndexStride(); + if (bytes_per_group < 1) { + return 1; + } + return bytes_per_group; +} + +ReadRangeGenerator::ReaderMeta ReadRangeGenerator::GetReaderMeta() const { + ReaderMeta reader_meta; + reader_meta.rows_per_group = reader_->getRowIndexStride(); + uint64_t stripe_num = reader_->getNumberOfStripes(); + for (uint64_t i = 0; i < stripe_num; i++) { + auto stripe_row_num = stripe_infos_[i]->getNumberOfRows(); + reader_meta.rows_per_stripes.push_back(stripe_row_num); + } + return reader_meta; +} + +uint64_t ReadRangeGenerator::MaxRowCountInStripe() const { + uint64_t max_row_num_per_stripe = 0; + uint64_t stripe_num = reader_->getNumberOfStripes(); + if (stripe_num == 0) { + return 0; + } + for (uint64_t i = 0; i < stripe_num; i++) { + auto stripe_row_num = stripe_infos_[i]->getNumberOfRows(); + max_row_num_per_stripe = std::max(max_row_num_per_stripe, stripe_row_num); + } + return max_row_num_per_stripe; +} + +uint64_t ReadRangeGenerator::CompressedLength(uint32_t col_id, + std::optional<::orc::StreamKind> stream_kind) const { + uint64_t length = 0; + auto iter = column_length_map_.find(col_id); + if (iter != column_length_map_.end()) { + auto& kind_and_length = iter->second; + if (stream_kind) { + auto it = kind_and_length.find(stream_kind.value()); + if (it != kind_and_length.end()) { + length = it->second; + } else { + assert(false); + } + } else { + for (const auto& [_, len] : kind_and_length) { + length += len; + } + } + } else { + assert(false); + } + return length; +} + +ReadRangeGenerator::ReadRangeGenerator( + const ::orc::Reader* reader, uint64_t natural_read_size, + std::vector>&& stripe_infos, + const std::map>& column_length_map, + const std::map& options) + : reader_(reader), + natural_read_size_(natural_read_size), + stripe_infos_(std::move(stripe_infos)), + column_length_map_(column_length_map), + options_(options) {} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/read_range_generator.h b/src/paimon/format/orc/read_range_generator.h new file mode 100644 index 00000000..f5cb9ec3 --- /dev/null +++ b/src/paimon/format/orc/read_range_generator.h @@ -0,0 +1,79 @@ +/* + * Copyright 2025-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "orc/Common.hh" +#include "orc/Reader.hh" +#include "orc/orc-config.hh" +#include "paimon/common/utils/options_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/result.h" + +namespace orc { +class Reader; +} // namespace orc + +namespace paimon::orc { + +class ReadRangeGenerator { + public: + static Result> Create( + const ::orc::Reader* reader, uint64_t natural_read_size, + const std::map& options); + + Result>> GenReadRanges( + std::vector target_column_ids, uint64_t begin_row_num, uint64_t end_row_num, + bool* need_prefetch) const; + + private: + struct ReaderMeta { + uint64_t rows_per_group; + std::vector rows_per_stripes; + }; + + static std::vector> DoGenReadRanges( + uint64_t begin_row_num, uint64_t end_row_num, uint32_t range_size, + const ReaderMeta& reader_meta); + + ReadRangeGenerator( + const ::orc::Reader* reader, uint64_t natural_read_size, + std::vector>&& stripe_infos, + const std::map>& column_length_map, + const std::map& options); + + uint64_t SuggestRowCount(const std::vector& target_column_ids) const; + double BytesPerGroup(const std::vector& column_ids) const; + ReaderMeta GetReaderMeta() const; + uint64_t MaxRowCountInStripe() const; + uint64_t CompressedLength(uint32_t col_id, std::optional<::orc::StreamKind> stream_kind) const; + + const ::orc::Reader* reader_; + const uint64_t natural_read_size_; + const std::vector> stripe_infos_; + const std::map> column_length_map_; + const std::map options_; +}; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/read_range_generator_test.cpp b/src/paimon/format/orc/read_range_generator_test.cpp new file mode 100644 index 00000000..fd39700b --- /dev/null +++ b/src/paimon/format/orc/read_range_generator_test.cpp @@ -0,0 +1,290 @@ +/* + * Copyright 2025-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/read_range_generator.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" +#include "orc/Reader.hh" +#include "orc/Type.hh" +#include "orc/Vector.hh" +#include "orc/Writer.hh" +#include "orc/orc-config.hh" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::orc::test { + +class ReadRangeGeneratorTest : public ::testing::Test { + public: + bool CheckEqual(const std::vector>& expected, + const std::vector>& actual) const { + if (expected.size() != actual.size()) { + std::cout << "expected: " << ToString(expected) << std::endl; + std::cout << "actual: " << ToString(actual) << std::endl; + return false; + } + for (size_t i = 0; i < expected.size(); i++) { + if (expected[i] != actual[i]) { + std::cout << "expected: " << ToString(expected) << std::endl; + std::cout << "actual: " << ToString(actual) << std::endl; + return false; + } + } + return true; + } + + std::string ToString(const std::vector>& ranges) const { + std::stringstream ss; + for (size_t i = 0; i < ranges.size(); i++) { + ss << "[ " << ranges[i].first << "," << ranges[i].second << " ] "; + if (i % 5 == 0 && i != 0) { + ss << std::endl; + } + } + ss << std::endl; + return ss.str(); + } +}; + +TEST_F(ReadRangeGeneratorTest, EmptyFile) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string file_path = dir->Str() + "/empty_file.orc"; + std::unique_ptr<::orc::Type> schema = ::orc::createStructType(); + schema->addStructField("id", ::orc::createPrimitiveType(::orc::TypeKind::INT)); + schema->addStructField("name", ::orc::createPrimitiveType(::orc::TypeKind::STRING)); + schema->addStructField("is_active", ::orc::createPrimitiveType(::orc::TypeKind::BOOLEAN)); + schema->addStructField("value", ::orc::createPrimitiveType(::orc::TypeKind::DOUBLE)); + ::orc::WriterOptions options; + ORC_UNIQUE_PTR<::orc::OutputStream> output_stream = ::orc::writeLocalFile(file_path); + std::unique_ptr<::orc::Writer> writer = + ::orc::createWriter(*schema, output_stream.get(), options); + writer->close(); + auto input_stream = ::orc::readLocalFile(file_path); + auto reader = createReader(std::move(input_stream), ::orc::ReaderOptions()); + ASSERT_OK_AND_ASSIGN(auto range_generator, + ReadRangeGenerator::Create(reader.get(), 1024 * 1024, {})); + bool need_prefetch = false; + ASSERT_OK_AND_ASSIGN(auto read_ranges, range_generator->GenReadRanges( + /*target_column_ids=*/{1, 2}, /*begin_row_num=*/0, + /*end_row_num=*/100, &need_prefetch)); + ASSERT_FALSE(need_prefetch); + std::vector> empty_ranges; + ASSERT_EQ(read_ranges, empty_ranges); +} + +TEST_F(ReadRangeGeneratorTest, Simple) { + /* +{ "name": "/tmp/paimon_test_a6daa513-04df-423f-98d1-7eebc52510ae/simple.orc", + "type": "struct", + "attributes": {}, + "rows": 300000, + "stripe count": 1, + "format": "0.12", "writer version": "ORC-135", "software version": "ORC C++ 2.1.1", + "compression": "zstd", "compression block": 65536, + "file length": 3763, + "content": 3528, "stripe stats": 64, "footer": 143, "postscript": 24, + "row index stride": 10000, + "user metadata": { + }, + "stripes": [ + { "stripe": 0, "rows": 300000, + "offset": 3, "length": 3528, + "index": 929, "data": 2512, "footer": 87, + "encodings": [ + { "column": 0, "encoding": "direct" }, + { "column": 1, "encoding": "dictionary rle2", "count": 100 }, + { "column": 2, "encoding": "direct rle2" } + ], + "streams": [ + { "id": 0, "column": 0, "kind": "index", "offset": 3, "length": 29 }, + { "id": 1, "column": 1, "kind": "index", "offset": 32, "length": 291 }, + { "id": 2, "column": 2, "kind": "index", "offset": 323, "length": 609 }, + { "id": 3, "column": 1, "kind": "data", "offset": 932, "length": 1682 }, + { "id": 4, "column": 1, "kind": "dictionary", "offset": 2614, "length": 251 }, + { "id": 5, "column": 1, "kind": "length", "offset": 2865, "length": 26 }, + { "id": 6, "column": 2, "kind": "data", "offset": 2891, "length": 553 } + ], + "timezone": "GMT" + } + ] +} + */ + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string file_path = dir->Str() + "/simple.orc"; + std::unique_ptr<::orc::Type> schema = ::orc::createStructType(); + schema->addStructField("name", ::orc::createPrimitiveType(::orc::TypeKind::STRING)); + schema->addStructField("id", ::orc::createPrimitiveType(::orc::TypeKind::LONG)); + ::orc::WriterOptions options; + options.setDictionaryKeySizeThreshold(1); + ORC_UNIQUE_PTR<::orc::OutputStream> output_stream = ::orc::writeLocalFile(file_path); + std::unique_ptr<::orc::Writer> writer = + ::orc::createWriter(*schema, output_stream.get(), options); + const int64_t num_rows = 300000; + for (int64_t i = 0; i < num_rows; ++i) { + auto batch = writer->createRowBatch(1); + auto struct_batch = dynamic_cast<::orc::StructVectorBatch*>(batch.get()); + auto* string_batch = dynamic_cast<::orc::StringVectorBatch*>(struct_batch->fields[0]); + auto* int64_batch = dynamic_cast<::orc::LongVectorBatch*>(struct_batch->fields[1]); + std::string str = "Row" + std::to_string(i % 100); + string_batch->data[0] = str.data(); + string_batch->length[0] = str.size(); + int64_batch->data[0] = i; + batch->numElements = 1; + writer->add(*struct_batch); + } + writer->close(); + auto input_stream = ::orc::readLocalFile(file_path); + auto reader = createReader(std::move(input_stream), ::orc::ReaderOptions()); + ASSERT_OK_AND_ASSIGN(auto range_generator, + ReadRangeGenerator::Create(reader.get(), /*natural_read_size=*/1024, + {{ENABLE_PREFETCH_READ_SIZE_THRESHOLD, "0"}})); + bool need_prefetch = false; + ASSERT_OK_AND_ASSIGN(auto read_ranges, + range_generator->GenReadRanges( + /*target_column_ids=*/{1, 2}, /*begin_row_num=*/0, + /*end_row_num=*/reader->getNumberOfRows(), &need_prefetch)); + ASSERT_TRUE(need_prefetch); + std::vector> expect_ranges = { + {0, 140000}, {140000, 280000}, {280000, 300000}}; + EXPECT_EQ(read_ranges, expect_ranges); + EXPECT_EQ(140000, range_generator->SuggestRowCount({1})); + EXPECT_EQ(140000, range_generator->SuggestRowCount({1, 2})); + EXPECT_EQ(270000, range_generator->SuggestRowCount({2})); +} + +TEST_F(ReadRangeGeneratorTest, SuggestRowCountWithFewRows) { + // struct + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + auto input_stream = ::orc::readLocalFile(file_name); + auto reader = createReader(std::move(input_stream), ::orc::ReaderOptions()); + ASSERT_OK_AND_ASSIGN(auto range_generator, + ReadRangeGenerator::Create(reader.get(), 1024 * 1024, {})); + EXPECT_EQ(8, range_generator->SuggestRowCount({1})); + EXPECT_EQ(8, range_generator->SuggestRowCount({4})); +} + +TEST_F(ReadRangeGeneratorTest, TestBytesPerGroup) { + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + + std::unique_ptr<::orc::Reader> reader = + ::orc::createReader(::orc::readLocalFile(file_name), ::orc::ReaderOptions()); + ASSERT_OK_AND_ASSIGN(auto range_generator, + ReadRangeGenerator::Create(reader.get(), 1024 * 1024, {})); + EXPECT_EQ(37500, range_generator->BytesPerGroup({2})); + EXPECT_EQ(96250, range_generator->BytesPerGroup({1, 2})); + EXPECT_EQ(96250, range_generator->BytesPerGroup({1})); +} + +TEST_F(ReadRangeGeneratorTest, TestGenReadRanges) { + ReadRangeGenerator::ReaderMeta meta; + meta.rows_per_group = 10000; + meta.rows_per_stripes = {20480, 20480, 20480, 20480, 2080}; + auto read_ranges_vec = ReadRangeGenerator::DoGenReadRanges( + /*begin_row_num=*/0, /*end_row_num=*/84000, /*range_size=*/10000, meta); + ASSERT_TRUE(CheckEqual({{0, 10000}, + {10000, 20000}, + {20000, 20480}, + {20480, 30480}, + {30480, 40480}, + {40480, 40960}, + {40960, 50960}, + {50960, 60960}, + {60960, 61440}, + {61440, 71440}, + {71440, 81440}, + {81440, 81920}, + {81920, 84000}}, + read_ranges_vec)); +} + +TEST_F(ReadRangeGeneratorTest, TestGenReadRangesBasic) { + ReadRangeGenerator::ReaderMeta meta; + meta.rows_per_group = 3; + meta.rows_per_stripes = {4, 6}; + auto read_ranges_vec = ReadRangeGenerator::DoGenReadRanges( + /*begin_row_num=*/0, /*end_row_num=*/10, /*range_size=*/3, meta); + ASSERT_TRUE(CheckEqual({{0, 3}, {3, 4}, {4, 7}, {7, 10}}, read_ranges_vec)); +} + +TEST_F(ReadRangeGeneratorTest, TestGenReadRangesMultiStripes) { + ReadRangeGenerator::ReaderMeta meta; + meta.rows_per_group = 4; + meta.rows_per_stripes = {10, 15}; + auto read_ranges_vec = ReadRangeGenerator::DoGenReadRanges( + /*begin_row_num=*/5, /*end_row_num=*/25, /*range_size=*/5, meta); + ASSERT_TRUE(CheckEqual({{5, 10}, {10, 15}, {15, 20}, {20, 25}}, read_ranges_vec)); +} + +TEST_F(ReadRangeGeneratorTest, TestGenReadRangesInvalid) { + ReadRangeGenerator::ReaderMeta meta; + meta.rows_per_group = 4; + meta.rows_per_stripes = {10, 15}; + auto read_ranges_vec = ReadRangeGenerator::DoGenReadRanges( + /*begin_row_num=*/3, /*end_row_num=*/2, /*range_size=*/5, meta); + ASSERT_TRUE(CheckEqual({}, read_ranges_vec)); +} + +TEST_F(ReadRangeGeneratorTest, TestGenReadRangesStripeBound) { + ReadRangeGenerator::ReaderMeta meta; + meta.rows_per_group = 5; + meta.rows_per_stripes = {10, 10}; + auto read_ranges_vec = ReadRangeGenerator::DoGenReadRanges( + /*begin_row_num=*/3, /*end_row_num=*/18, /*range_size=*/5, meta); + ASSERT_TRUE(CheckEqual({{3, 8}, {8, 10}, {10, 15}, {15, 18}}, read_ranges_vec)); +} + +TEST_F(ReadRangeGeneratorTest, TestGenReadRangesNotMatch) { + ReadRangeGenerator::ReaderMeta meta; + meta.rows_per_group = 2; + meta.rows_per_stripes = {4, 6}; + auto read_ranges_vec = ReadRangeGenerator::DoGenReadRanges( + /*begin_row_num=*/10, /*end_row_num=*/10, /*range_size=*/2, meta); + ASSERT_TRUE(CheckEqual({}, read_ranges_vec)); +} + +TEST_F(ReadRangeGeneratorTest, CreateWithNullReader) { + ASSERT_NOK(ReadRangeGenerator::Create(nullptr, 1024 * 1024, {})); +} + +TEST_F(ReadRangeGeneratorTest, BytesPerGroupWithEmptyColumns) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string file_path = dir->Str() + "/empty_columns.orc"; + std::unique_ptr<::orc::Type> schema = ::orc::createStructType(); + schema->addStructField("id", ::orc::createPrimitiveType(::orc::TypeKind::INT)); + ::orc::WriterOptions options; + ORC_UNIQUE_PTR<::orc::OutputStream> output_stream = ::orc::writeLocalFile(file_path); + std::unique_ptr<::orc::Writer> writer = + ::orc::createWriter(*schema, output_stream.get(), options); + writer->close(); + auto input_stream = ::orc::readLocalFile(file_path); + auto reader = createReader(std::move(input_stream), ::orc::ReaderOptions()); + ASSERT_OK_AND_ASSIGN(auto range_generator, + ReadRangeGenerator::Create(reader.get(), 1024 * 1024, {})); + EXPECT_EQ(1, range_generator->BytesPerGroup({})); +} + +} // namespace paimon::orc::test diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 0011fb1a..f28ccb9c 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -36,7 +36,7 @@ #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/parquet/file_reader_wrapper.h" -#include "paimon/reader/file_batch_reader.h" +#include "paimon/reader/prefetch_file_batch_reader.h" #include "paimon/result.h" #include "paimon/status.h" #include "parquet/arrow/reader.h" @@ -57,7 +57,7 @@ class RoaringBitmap32; namespace paimon::parquet { -class ParquetFileBatchReader : public FileBatchReader { +class ParquetFileBatchReader : public PrefetchFileBatchReader { public: static Result> Create( std::shared_ptr&& input_stream, diff --git a/src/paimon/testing/mock/mock_file_batch_reader.h b/src/paimon/testing/mock/mock_file_batch_reader.h index 2f80196b..398c0520 100644 --- a/src/paimon/testing/mock/mock_file_batch_reader.h +++ b/src/paimon/testing/mock/mock_file_batch_reader.h @@ -27,9 +27,10 @@ #include "paimon/common/reader/reader_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/date_time_utils.h" -#include "paimon/reader/file_batch_reader.h" +#include "paimon/reader/prefetch_file_batch_reader.h" namespace paimon::test { -class MockFileBatchReader : public FileBatchReader { + +class MockFileBatchReader : public PrefetchFileBatchReader { public: MockFileBatchReader(const std::shared_ptr& data, const std::shared_ptr& file_schema, @@ -181,4 +182,5 @@ class MockFileBatchReader : public FileBatchReader { bool enable_randomize_batch_size_ = true; std::vector> read_ranges_; }; + } // namespace paimon::test diff --git a/test/inte/read_inte_test.cpp b/test/inte/read_inte_test.cpp index 28137bbc..46897d96 100644 --- a/test/inte/read_inte_test.cpp +++ b/test/inte/read_inte_test.cpp @@ -199,6 +199,8 @@ std::vector PrepareTestParam() { #ifdef PAIMON_ENABLE_ORC values.push_back(TestParam{false, "false", "orc"}); + values.push_back(TestParam{true, "true", "orc"}); + values.push_back(TestParam{true, "false", "orc"}); #endif return values; } diff --git a/test/inte/read_inte_with_index_test.cpp b/test/inte/read_inte_with_index_test.cpp index bbc5a275..bd26f187 100644 --- a/test/inte/read_inte_with_index_test.cpp +++ b/test/inte/read_inte_with_index_test.cpp @@ -614,6 +614,7 @@ std::vector> GetTestValuesForReadInteWithIndexTest( std::vector> values = {{"parquet", false}, {"parquet", true}}; #ifdef PAIMON_ENABLE_ORC values.emplace_back("orc", false); + values.emplace_back("orc", true); #endif return values; }