Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,40 +113,48 @@ public void testWrite() {
}

@Test
public void testAbortAfterFailedPartUpload() {
public void testAbortAfterFailedPartUpload() throws Exception {
RuntimeException mockException = new RuntimeException("mock uploadPart failure");
doThrow(mockException).when(s3mock).uploadPart((UploadPartRequest) any(), (RequestBody) any());

S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics());

assertThatThrownBy(
() -> {
try (S3OutputStream stream =
new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) {
stream.write(randomData(10 * 1024 * 1024));
}
stream.write(randomData(10 * 1024 * 1024));
stream.close();
})
.isInstanceOf(mockException.getClass())
.hasMessageContaining(mockException.getMessage());

// Verify that staging files and multipart map are cleared after abort
assertThat(stream.isStagingFileListCleared()).isTrue();
assertThat(stream.isMultiPartMapCleared()).isTrue();

verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any());
}

@Test
public void testAbortMultipart() {
public void testAbortMultipart() throws Exception {
RuntimeException mockException = new RuntimeException("mock completeMultipartUpload failure");
doThrow(mockException)
.when(s3mock)
.completeMultipartUpload((CompleteMultipartUploadRequest) any());

S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics());

assertThatThrownBy(
() -> {
try (S3OutputStream stream =
new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) {
stream.write(randomData(10 * 1024 * 1024));
}
stream.write(randomData(10 * 1024 * 1024));
stream.close();
})
.isInstanceOf(mockException.getClass())
.hasMessageContaining(mockException.getMessage());

// Verify that staging files and multipart map are cleared after abort
assertThat(stream.isStagingFileListCleared()).isTrue();
assertThat(stream.isMultiPartMapCleared()).isTrue();

verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any());
}

Expand Down Expand Up @@ -334,4 +342,40 @@ private void createBucket(String bucketName) {
// do nothing
}
}

@Test
public void testStagingFilesAndMultipartMapClearedAfterSuccessfulWrite() throws Exception {
// Test single-part upload (small file)
byte[] smallData = randomData(1024);
S3URI uri = randomURI();

S3OutputStream stream = new S3OutputStream(s3, uri, properties, nullMetrics());
stream.write(smallData);
stream.close();

assertThat(stream.isStagingFileListCleared()).isTrue();
assertThat(stream.isMultiPartMapCleared()).isTrue();

// Verify the data was written successfully
byte[] actual = readS3Data(uri);
assertThat(actual).isEqualTo(smallData);
}

@Test
public void testStagingFilesAndMultipartMapClearedAfterMultipartUpload() throws Exception {
// Test multipart upload (large file)
byte[] largeData = randomData(10 * 1024 * 1024);
S3URI uri = randomURI();

S3OutputStream stream = new S3OutputStream(s3, uri, properties, nullMetrics());
stream.write(largeData);
stream.close();

assertThat(stream.isStagingFileListCleared()).isTrue();
assertThat(stream.isMultiPartMapCleared()).isTrue();

// Verify the data was written successfully
byte[] actual = readS3Data(uri);
assertThat(actual).isEqualTo(largeData);
}
}
13 changes: 13 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ private void cleanUpStagingFiles() {
.suppressFailureWhenFinished()
.onFailure((file, thrown) -> LOG.warn("Failed to delete staging file: {}", file, thrown))
.run(File::delete);
// clear staging files and multipart map
stagingFiles.clear();
multiPartMap.clear();
Comment on lines +410 to +412
Copy link
Contributor

@singhpk234 singhpk234 Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its fine to do eager cleanup
though wouldn't the stream post abort closed and hence GCed ? is this staying in memory for long

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @singhpk234 for the review.

Regarding memory management:
While the staging files list will eventually allow objects to be garbage-collected once they go out of scope, I’m concerned that retaining strong references to many FileAndDigest objects (especially in upload-heavy / long-running workloads) can still cause practical issues:

  • Increased heap pressure during periods of high concurrent or sequential uploads
  • Longer object lifetime → more frequent / longer GC pauses
  • Higher risk of OutOfMemoryError during peak load (I’ve sometimes observed OOMs in similar scenarios when large numbers of parts accumulate without cleanup while running Iceberg-Kafka-Connect)

Even though the theoretical lifetime is finite, the practical memory pressure and GC overhead seem non-negligible in our use case.

Also although it does not effect the AWS multipart upload as AWS requires the part number to be unique but starting the part number from 1 and keeping it in low bounds make managing CompleteMultipartUpload requests easier. Currently the part number comes from the Index() of the part-file from staging files list which can start from a higher number if the previous files are not cleared.

Please let me know your thoughts on these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any harm in clearing these eagerly, but I'd agree with @singhpk234 that this isn't practically going to help. The output stream is bound to a single file write and these references will be kept as long as the stream is open regardless. So the only narrow scenario where you get some benefit is if something is holding onto the output stream references even after they're closed (which is odd in practice).

This isn't a leak and I question the validity of the examples. The memory will be held when writing, but released when closed and dereferenced.

}

private void completeUploads() {
Expand Down Expand Up @@ -512,4 +515,14 @@ public boolean hasDigest() {
return digest != null;
}
}

// package private methods to test clearing of stating file list and multipart map

boolean isStagingFileListCleared() {
return stagingFiles.isEmpty();
}

boolean isMultiPartMapCleared() {
return multiPartMap.isEmpty();
}
}