Skip to content

Commit 79a8f0c

Browse files
authored
feat: Rework of storage based broadcast join for more efficient buffering (prestodb#26041)
Make presto-on-spark native broadcast join more memory/performance efficient. It is done by * Allowing broadcast join writer to be able to separate serialized pages and hence able to buffer instead of directly write through for every input vector batch. * Adding file footer information for storing page sizes such that reader can use to give size estimation information to exchange client, allowing better control of client memory. * Make broadcast exchange source async instead of blocking IO call to leverage parallelism. This makes storage based broadcast join shuffle * memory bounded by exchange's memory cap. * 5x more performant == NO RELEASE NOTE ==
1 parent db4232a commit 79a8f0c

File tree

15 files changed

+892
-230
lines changed

15 files changed

+892
-230
lines changed

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -768,13 +768,12 @@ class SystemConfig : public ConfigBase {
768768
/// Enable the type char(n) with the same behavior as unbounded varchar.
769769
/// char(n) type is not supported by parser when set to false.
770770
static constexpr std::string_view kCharNToVarcharImplicitCast{
771-
"char-n-to-varchar-implicit-cast"};
771+
"char-n-to-varchar-implicit-cast"};
772772

773-
/// Enable BigintEnum and VarcharEnum types to be parsed and used in Velox.
774-
/// When set to false, BigintEnum or VarcharEnum types will throw an
775-
// unsupported error during type parsing.
776-
static constexpr std::string_view kEnumTypesEnabled{
777-
"enum-types-enabled"};
773+
/// Enable BigintEnum and VarcharEnum types to be parsed and used in Velox.
774+
/// When set to false, BigintEnum or VarcharEnum types will throw an
775+
/// unsupported error during type parsing.
776+
static constexpr std::string_view kEnumTypesEnabled{"enum-types-enabled"};
778777

779778
SystemConfig();
780779

presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,36 +34,48 @@ std::optional<std::string> getBroadcastInfo(folly::Uri& uri) {
3434

3535
folly::SemiFuture<BroadcastExchangeSource::Response>
3636
BroadcastExchangeSource::request(
37-
uint32_t /*maxBytes*/,
37+
uint32_t maxBytes,
3838
std::chrono::microseconds /*maxWait*/) {
39+
VELOX_CHECK_GT(maxBytes, 0);
3940
if (atEnd_) {
4041
return folly::makeFuture(Response{0, true});
4142
}
4243

43-
atEnd_ = !reader_->hasNext();
44-
int64_t totalBytes = 0;
45-
std::unique_ptr<velox::exec::SerializedPage> page;
46-
if (!atEnd_) {
47-
// Read outside the lock to avoid a potential deadlock
48-
// ExchangeClient guarantees not to call ExchangeSource#request concurrently
49-
auto buffer = reader_->next();
50-
totalBytes = buffer->size();
51-
auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
52-
page = std::make_unique<velox::exec::SerializedPage>(
53-
std::move(ioBuf), [buffer](auto& /*unused*/) {});
54-
}
44+
return folly::makeTryWith([&]() -> Response {
45+
int64_t totalBytes = 0;
46+
std::vector<std::unique_ptr<velox::exec::SerializedPage>> pages;
5547

56-
std::vector<velox::ContinuePromise> promises;
57-
{
58-
// Limit locking scope to queue manipulation
59-
std::lock_guard<std::mutex> l(queue_->mutex());
60-
queue_->enqueueLocked(std::move(page), promises);
61-
}
62-
for (auto& promise : promises) {
63-
promise.setValue();
64-
}
48+
while (totalBytes < maxBytes && reader_->hasNext()) {
49+
auto buffer = reader_->next();
50+
VELOX_CHECK_NOT_NULL(buffer);
51+
52+
auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
53+
auto page = std::make_unique<velox::exec::SerializedPage>(
54+
std::move(ioBuf), [buffer](auto& /*unused*/) {});
55+
pages.push_back(std::move(page));
6556

66-
return folly::makeFuture(Response{totalBytes, atEnd_});
57+
totalBytes += buffer->size();
58+
}
59+
60+
atEnd_ = !reader_->hasNext();
61+
std::vector<velox::ContinuePromise> promises;
62+
{
63+
// Limit locking scope to queue manipulation
64+
std::lock_guard<std::mutex> l(queue_->mutex());
65+
for (auto& page : pages) {
66+
queue_->enqueueLocked(std::move(page), promises);
67+
}
68+
if (atEnd_) {
69+
// Notify exchange queue 'this' source has finished.
70+
queue_->enqueueLocked(nullptr, promises);
71+
}
72+
}
73+
for (auto& promise : promises) {
74+
promise.setValue();
75+
}
76+
77+
return Response{totalBytes, atEnd_, reader_->remainingPageSizes()};
78+
});
6779
}
6880

6981
folly::F14FastMap<std::string, int64_t> BroadcastExchangeSource::stats() const {
@@ -73,14 +85,25 @@ folly::F14FastMap<std::string, int64_t> BroadcastExchangeSource::stats() const {
7385
folly::SemiFuture<BroadcastExchangeSource::Response>
7486
BroadcastExchangeSource::requestDataSizes(
7587
std::chrono::microseconds /*maxWait*/) {
76-
std::vector<int64_t> remainingBytes;
77-
if (!atEnd_) {
78-
// Use default value of ExchangeClient::getAveragePageSize() for now.
79-
//
80-
// TODO: Change BroadcastFileReader to return the next batch size.
81-
remainingBytes.push_back(1 << 20);
82-
}
83-
return folly::makeSemiFuture(Response{0, atEnd_, std::move(remainingBytes)});
88+
return folly::makeTryWith([&]() -> Response {
89+
auto remainingPageSizes = reader_->remainingPageSizes();
90+
91+
// If the source is empty from the start, signal completion to ExchangeQueue
92+
if (remainingPageSizes.empty()) {
93+
atEnd_ = true;
94+
std::vector<velox::ContinuePromise> promises;
95+
{
96+
std::lock_guard<std::mutex> l(queue_->mutex());
97+
// Notify exchange queue 'this' source has finished.
98+
queue_->enqueueLocked(nullptr, promises);
99+
}
100+
for (auto& promise : promises) {
101+
promise.setValue();
102+
}
103+
}
104+
105+
return Response{0, atEnd_, std::move(remainingPageSizes)};
106+
});
84107
}
85108

86109
// static

0 commit comments

Comments
 (0)