diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java index e59bb182cb18..c9d9557d2062 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java @@ -113,40 +113,48 @@ public void testWrite() { } @Test - public void testAbortAfterFailedPartUpload() { + public void testAbortAfterFailedPartUpload() throws Exception { RuntimeException mockException = new RuntimeException("mock uploadPart failure"); doThrow(mockException).when(s3mock).uploadPart((UploadPartRequest) any(), (RequestBody) any()); + S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics()); + assertThatThrownBy( () -> { - try (S3OutputStream stream = - new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) { - stream.write(randomData(10 * 1024 * 1024)); - } + stream.write(randomData(10 * 1024 * 1024)); + stream.close(); }) .isInstanceOf(mockException.getClass()) .hasMessageContaining(mockException.getMessage()); + // Verify that staging files and multipart map are cleared after abort + assertThat(stream.isStagingFileListCleared()).isTrue(); + assertThat(stream.isMultiPartMapCleared()).isTrue(); + verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any()); } @Test - public void testAbortMultipart() { + public void testAbortMultipart() throws Exception { RuntimeException mockException = new RuntimeException("mock completeMultipartUpload failure"); doThrow(mockException) .when(s3mock) .completeMultipartUpload((CompleteMultipartUploadRequest) any()); + S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics()); + assertThatThrownBy( () -> { - try (S3OutputStream stream = - new S3OutputStream(s3mock, randomURI(), properties, nullMetrics())) { - stream.write(randomData(10 * 1024 * 1024)); - } + stream.write(randomData(10 * 1024 * 1024)); + stream.close(); }) .isInstanceOf(mockException.getClass()) .hasMessageContaining(mockException.getMessage()); + // Verify that staging files and multipart map are cleared after abort + assertThat(stream.isStagingFileListCleared()).isTrue(); + assertThat(stream.isMultiPartMapCleared()).isTrue(); + verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any()); } @@ -334,4 +342,40 @@ private void createBucket(String bucketName) { // do nothing } } + + @Test + public void testStagingFilesAndMultipartMapClearedAfterSuccessfulWrite() throws Exception { + // Test single-part upload (small file) + byte[] smallData = randomData(1024); + S3URI uri = randomURI(); + + S3OutputStream stream = new S3OutputStream(s3, uri, properties, nullMetrics()); + stream.write(smallData); + stream.close(); + + assertThat(stream.isStagingFileListCleared()).isTrue(); + assertThat(stream.isMultiPartMapCleared()).isTrue(); + + // Verify the data was written successfully + byte[] actual = readS3Data(uri); + assertThat(actual).isEqualTo(smallData); + } + + @Test + public void testStagingFilesAndMultipartMapClearedAfterMultipartUpload() throws Exception { + // Test multipart upload (large file) + byte[] largeData = randomData(10 * 1024 * 1024); + S3URI uri = randomURI(); + + S3OutputStream stream = new S3OutputStream(s3, uri, properties, nullMetrics()); + stream.write(largeData); + stream.close(); + + assertThat(stream.isStagingFileListCleared()).isTrue(); + assertThat(stream.isMultiPartMapCleared()).isTrue(); + + // Verify the data was written successfully + byte[] actual = readS3Data(uri); + assertThat(actual).isEqualTo(largeData); + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 50963127f52a..548d1a577c5c 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -407,6 +407,9 @@ private void cleanUpStagingFiles() { .suppressFailureWhenFinished() .onFailure((file, thrown) -> LOG.warn("Failed to delete staging file: {}", file, thrown)) .run(File::delete); + // clear staging files and multipart map + stagingFiles.clear(); + multiPartMap.clear(); } private void completeUploads() { @@ -512,4 +515,14 @@ public boolean hasDigest() { return digest != null; } } + + // package private methods to test clearing of stating file list and multipart map + + boolean isStagingFileListCleared() { + return stagingFiles.isEmpty(); + } + + boolean isMultiPartMapCleared() { + return multiPartMap.isEmpty(); + } }