Skip to content

Commit 1cc3d25

Browse files
authored
[fix](s3filewriter) Fix s3_write_buffer_size boundary issue (apache#47333)
introduced by apache#43254
1 parent 0b9e3be commit 1cc3d25

File tree

2 files changed

+417
-12
lines changed

2 files changed

+417
-12
lines changed

be/src/io/fs/s3_file_writer.cpp

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ S3FileWriter::~S3FileWriter() {
9090
}
9191

9292
Status S3FileWriter::_create_multi_upload_request() {
93+
LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native();
9394
const auto& client = _obj_client->get();
9495
if (nullptr == client) {
9596
return Status::InternalError<false>("invalid obj storage client");
@@ -224,11 +225,6 @@ Status S3FileWriter::_close_impl() {
224225
_countdown_event.add_count();
225226
RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
226227
_pending_buf = nullptr;
227-
} else if (_bytes_appended != 0) { // Non-empty file and has nothing to be uploaded
228-
// NOTE: When the data size is a multiple of config::s3_write_buffer_size,
229-
// _cur_part_num may exceed the actual number of parts that need to be uploaded.
230-
// This is because it is incremented by 1 in advance within the S3FileWriter::appendv method.
231-
_cur_part_num--;
232228
}
233229

234230
RETURN_IF_ERROR(_complete());
@@ -265,12 +261,13 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
265261
Slice {data[i].get_data() + pos, data_size_to_append}));
266262
TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num);
267263

268-
// if it's the last part, it could be less than 5MB, or it must
269-
// satisfy that the size is larger than or euqal to 5MB
270-
// _complete() would handle the first situation
264+
// If this is the last part and the data size is less than s3_write_buffer_size,
265+
// the pending_buf will be handled by _close_impl() and _complete()
266+
// If this is the last part and the data size is equal to s3_write_buffer_size,
267+
// the pending_buf is handled here and submitted. it will be waited by _complete()
271268
if (_pending_buf->get_size() == buffer_size) {
272-
// only create multiple upload request when the data is more
273-
// than one memory buffer
269+
// only create multiple upload request when the data size is
270+
// larger or equal to s3_write_buffer_size than one memory buffer
274271
if (_cur_part_num == 1) {
275272
RETURN_IF_ERROR(_create_multi_upload_request());
276273
}
@@ -286,6 +283,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
286283
}
287284

288285
void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {
286+
VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native()
287+
<< " part=" << part_num;
289288
if (buf.is_cancelled()) {
290289
LOG_INFO("file {} skip part {} because previous failure {}",
291290
_obj_storage_path_opts.path.native(), part_num, _st);
@@ -337,11 +336,20 @@ Status S3FileWriter::_complete() {
337336
return Status::OK();
338337
}
339338

340-
if (_failed || _completed_parts.size() != _cur_part_num) {
339+
// check number of parts
340+
int expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) +
341+
!!(_bytes_appended % config::s3_write_buffer_size);
342+
int expected_num_parts2 =
343+
(_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1;
344+
DCHECK_EQ(expected_num_parts1, expected_num_parts2)
345+
<< " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num
346+
<< " s3_write_buffer_size=" << config::s3_write_buffer_size;
347+
if (_failed || _completed_parts.size() != expected_num_parts1 ||
348+
expected_num_parts1 != expected_num_parts2) {
341349
_st = Status::InternalError(
342350
"error status={} failed={} #complete_parts={} #expected_parts={} "
343351
"completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
344-
_st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(),
352+
_st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(),
345353
_obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
346354
LOG(WARNING) << _st;
347355
return _st;
@@ -350,6 +358,9 @@ Status S3FileWriter::_complete() {
350358
std::sort(_completed_parts.begin(), _completed_parts.end(),
351359
[](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
352360
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
361+
LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native()
362+
<< " size=" << _bytes_appended << " number_parts=" << _completed_parts.size()
363+
<< " s3_write_buffer_size=" << config::s3_write_buffer_size;
353364
auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
354365
if (resp.status.code != ErrorCode::OK) {
355366
LOG_WARNING("Compltet multi part upload failed because {}, file path {}", resp.status.msg,
@@ -379,6 +390,8 @@ Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
379390
}
380391

381392
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
393+
LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native()
394+
<< " size=" << _bytes_appended;
382395
if (state() == State::CLOSED) {
383396
DCHECK(state() != State::CLOSED)
384397
<< "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native();

0 commit comments

Comments
 (0)