Skip to content

Commit 11dc5f1

Browse files
JkSelfglutenperfbot
authored andcommitted
Add RowsStreamingWindowBuild to avoid OOM in Window operator
1 parent 5bcc708 commit 11dc5f1

20 files changed

+554
-26
lines changed

velox/exec/AggregateWindow.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,8 @@ void registerAggregateWindowFunction(const std::string& name) {
426426
pool,
427427
stringAllocator,
428428
config);
429-
});
429+
},
430+
{exec::ProcessingUnit::kRows, false});
430431
}
431432
}
432433
} // namespace facebook::velox::exec

velox/exec/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ add_library(
6262
PlanNodeStats.cpp
6363
PrefixSort.cpp
6464
ProbeOperatorState.cpp
65+
RowsStreamingWindowBuild.cpp
66+
RowsStreamingWindowPartition.cpp
6567
RowContainer.cpp
6668
RowNumber.cpp
6769
SortBuffer.cpp
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/exec/RowsStreamingWindowBuild.h"
18+
#include "velox/exec/RowsStreamingWindowPartition.h"
19+
20+
namespace facebook::velox::exec {
21+
22+
RowsStreamingWindowBuild::RowsStreamingWindowBuild(
23+
const std::shared_ptr<const core::WindowNode>& windowNode,
24+
velox::memory::MemoryPool* pool,
25+
const common::SpillConfig* spillConfig,
26+
tsan_atomic<bool>* nonReclaimableSection)
27+
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}
28+
29+
void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
30+
if (windowPartitions_.size() <= inputCurrentPartition_) {
31+
windowPartitions_.push_back(std::make_shared<RowsStreamingWindowPartition>(
32+
data_.get(),
33+
folly::Range<char**>(nullptr, nullptr),
34+
inversedInputChannels_,
35+
sortKeyInfo_));
36+
}
37+
38+
windowPartitions_[inputCurrentPartition_]->addNewRows(inputRows_);
39+
40+
if (isFinished) {
41+
windowPartitions_[inputCurrentPartition_]->setInputRowsFinished();
42+
inputCurrentPartition_++;
43+
}
44+
45+
inputRows_.clear();
46+
}
47+
48+
void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
49+
for (auto i = 0; i < inputChannels_.size(); ++i) {
50+
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
51+
}
52+
53+
for (auto row = 0; row < input->size(); ++row) {
54+
char* newRow = data_->newRow();
55+
56+
for (auto col = 0; col < input->childrenSize(); ++col) {
57+
data_->store(decodedInputVectors_[col], row, newRow, col);
58+
}
59+
60+
if (previousRow_ != nullptr &&
61+
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
62+
buildNextInputOrPartition(true);
63+
}
64+
65+
// Wait for the peers to be ready in single partition; these peers are the
66+
// rows that have identical values in the ORDER BY clause.
67+
if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_ &&
68+
compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) {
69+
buildNextInputOrPartition(false);
70+
}
71+
72+
inputRows_.push_back(newRow);
73+
previousRow_ = newRow;
74+
}
75+
}
76+
77+
void RowsStreamingWindowBuild::noMoreInput() {
78+
buildNextInputOrPartition(true);
79+
}
80+
81+
std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
82+
if (outputCurrentPartition_ > 0) {
83+
windowPartitions_[outputCurrentPartition_].reset();
84+
}
85+
86+
return windowPartitions_[++outputCurrentPartition_];
87+
}
88+
89+
bool RowsStreamingWindowBuild::hasNextPartition() {
90+
return windowPartitions_.size() > 0 &&
91+
outputCurrentPartition_ <= int(windowPartitions_.size() - 2);
92+
}
93+
94+
} // namespace facebook::velox::exec
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/exec/WindowBuild.h"
20+
21+
namespace facebook::velox::exec {
22+
23+
/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of
24+
/// processing window functions as rows arrive within a single partition,
25+
/// without the need to wait for the entire partition to be ready. This approach
26+
/// can significantly reduce memory usage, especially when a single partition
27+
/// contains a large amount of data. It is particularly suited for optimizing
28+
/// rank and row_number functions, as well as aggregate window functions with a
29+
/// default frame.
30+
class RowsStreamingWindowBuild : public WindowBuild {
31+
public:
32+
RowsStreamingWindowBuild(
33+
const std::shared_ptr<const core::WindowNode>& windowNode,
34+
velox::memory::MemoryPool* pool,
35+
const common::SpillConfig* spillConfig,
36+
tsan_atomic<bool>* nonReclaimableSection);
37+
38+
void addInput(RowVectorPtr input) override;
39+
40+
void spill() override {
41+
VELOX_UNREACHABLE();
42+
}
43+
44+
std::optional<common::SpillStats> spilledStats() const override {
45+
return std::nullopt;
46+
}
47+
48+
void noMoreInput() override;
49+
50+
bool hasNextPartition() override;
51+
52+
std::shared_ptr<WindowPartition> nextPartition() override;
53+
54+
bool needsInput() override {
55+
// No partitions are available or the currentPartition is the last available
56+
// one, so can consume input rows.
57+
return windowPartitions_.size() == 0 ||
58+
outputCurrentPartition_ == windowPartitions_.size() - 1;
59+
}
60+
61+
private:
62+
void buildNextInputOrPartition(bool isFinished);
63+
64+
// Holds input rows within the current partition.
65+
std::vector<char*> inputRows_;
66+
67+
// Used to compare rows based on partitionKeys.
68+
char* previousRow_ = nullptr;
69+
70+
// Current partition being output. Used to return the WidnowPartitions.
71+
vector_size_t outputCurrentPartition_ = -1;
72+
73+
// Current partition when adding input. Used to construct WindowPartitions.
74+
vector_size_t inputCurrentPartition_ = 0;
75+
76+
// Holds all the WindowPartitions.
77+
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;
78+
};
79+
80+
} // namespace facebook::velox::exec
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/exec/RowsStreamingWindowPartition.h"
17+
18+
namespace facebook::velox::exec {
19+
20+
RowsStreamingWindowPartition::RowsStreamingWindowPartition(
21+
RowContainer* data,
22+
const folly::Range<char**>& rows,
23+
const std::vector<column_index_t>& inputMapping,
24+
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
25+
: WindowPartition(data, rows, inputMapping, sortKeyInfo) {
26+
partitionStartRows_.push_back(0);
27+
}
28+
29+
void RowsStreamingWindowPartition::addNewRows(std::vector<char*> rows) {
30+
partitionStartRows_.push_back(partitionStartRows_.back() + rows.size());
31+
32+
sortedRows_.insert(sortedRows_.end(), rows.begin(), rows.end());
33+
}
34+
35+
bool RowsStreamingWindowPartition::buildNextRows() {
36+
if (currentPartition_ >= int(partitionStartRows_.size() - 2))
37+
return false;
38+
39+
currentPartition_++;
40+
41+
// Erase previous rows in current partition.
42+
if (currentPartition_ > 0) {
43+
auto numPreviousPartitionRows = partitionStartRows_[currentPartition_] -
44+
partitionStartRows_[currentPartition_ - 1];
45+
data_->eraseRows(
46+
folly::Range<char**>(sortedRows_.data(), numPreviousPartitionRows));
47+
sortedRows_.erase(
48+
sortedRows_.begin(), sortedRows_.begin() + numPreviousPartitionRows);
49+
}
50+
51+
auto partitionSize = partitionStartRows_[currentPartition_ + 1] -
52+
partitionStartRows_[currentPartition_];
53+
54+
partition_ = folly::Range(sortedRows_.data(), partitionSize);
55+
return true;
56+
}
57+
58+
} // namespace facebook::velox::exec
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/exec/RowContainer.h"
19+
#include "velox/exec/WindowPartition.h"
20+
21+
namespace facebook::velox::exec {
22+
23+
/// RowsStreamingWindowPartition is to facilitate RowsStreamingWindowBuild by
24+
/// processing rows within WindowPartition in a streaming manner.
25+
class RowsStreamingWindowPartition : public WindowPartition {
26+
public:
27+
RowsStreamingWindowPartition(
28+
RowContainer* data,
29+
const folly::Range<char**>& rows,
30+
const std::vector<column_index_t>& inputMapping,
31+
const std::vector<std::pair<column_index_t, core::SortOrder>>&
32+
sortKeyInfo);
33+
34+
// Returns the number of rows in the current partial window partition,
35+
// including the offset within the full partition.
36+
vector_size_t numRows() const override {
37+
if (currentPartition_ == -1) {
38+
return 0;
39+
} else {
40+
return partition_.size() + partitionStartRows_[currentPartition_];
41+
}
42+
}
43+
44+
// Returns the starting offset of the current partial window partition within
45+
// the full partition.
46+
vector_size_t offsetInPartition() const override {
47+
return partitionStartRows_[currentPartition_];
48+
}
49+
50+
// Indicates support for row-level streaming processing.
51+
bool supportRowLevelStreaming() const override {
52+
return true;
53+
}
54+
55+
// Sets the flag indicating that all input rows have been processed on the
56+
// producer side.
57+
void setInputRowsFinished() override {
58+
inputRowsFinished_ = true;
59+
}
60+
61+
// Adds new rows to the partition using a streaming approach on the producer
62+
// side.
63+
void addNewRows(std::vector<char*> rows) override;
64+
65+
// Builds the next set of available rows on the consumer side.
66+
bool buildNextRows() override;
67+
68+
// Determines if the current partition is complete and then proceed to the
69+
// next partition.
70+
bool processFinished() const override {
71+
return (
72+
inputRowsFinished_ &&
73+
currentPartition_ == partitionStartRows_.size() - 2);
74+
}
75+
76+
private:
77+
// Indicates whether all input rows have been added to sortedRows_
78+
bool inputRowsFinished_ = false;
79+
80+
// Stores new rows added to the WindowPartition.
81+
std::vector<char*> sortedRows_;
82+
83+
// Indices of the start row (in sortedRows_) of each partitial partition.
84+
std::vector<vector_size_t> partitionStartRows_;
85+
86+
// Current partial partition being output.
87+
vector_size_t currentPartition_ = -1;
88+
};
89+
} // namespace facebook::velox::exec

velox/exec/SortWindowBuild.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,11 +291,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
291291
}
292292
}
293293

294-
std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
294+
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
295295
if (merge_ != nullptr) {
296296
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
297297
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
298-
return std::make_unique<WindowPartition>(
298+
return std::make_shared<WindowPartition>(
299299
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
300300
}
301301

@@ -313,7 +313,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
313313
auto partition = folly::Range(
314314
sortedRows_.data() + partitionStartRows_[currentPartition_],
315315
partitionSize);
316-
return std::make_unique<WindowPartition>(
316+
return std::make_shared<WindowPartition>(
317317
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
318318
}
319319

velox/exec/SortWindowBuild.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild {
5353

5454
bool hasNextPartition() override;
5555

56-
std::unique_ptr<WindowPartition> nextPartition() override;
56+
std::shared_ptr<WindowPartition> nextPartition() override;
5757

5858
private:
5959
void ensureInputFits(const RowVectorPtr& input);

velox/exec/StreamingWindowBuild.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() {
6060
partitionStartRows_.push_back(sortedRows_.size());
6161
}
6262

63-
std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
63+
std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
6464
VELOX_CHECK_GT(
6565
partitionStartRows_.size(), 0, "No window partitions available")
6666

@@ -89,7 +89,7 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
8989
sortedRows_.data() + partitionStartRows_[currentPartition_],
9090
partitionSize);
9191

92-
return std::make_unique<WindowPartition>(
92+
return std::make_shared<WindowPartition>(
9393
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
9494
}
9595

0 commit comments

Comments
 (0)