11package io .smallrye .mutiny .operators .uni ;
22
3+ import java .util .concurrent .CancellationException ;
34import java .util .concurrent .CompletableFuture ;
45import java .util .concurrent .atomic .AtomicReference ;
56
@@ -13,21 +14,22 @@ public class UniSubscribeToCompletionStage {
1314 public static <T > CompletableFuture <T > subscribe (Uni <T > uni , Context context ) {
1415 final AtomicReference <Cancellable > cancellable = new AtomicReference <>();
1516
16- CompletableFuture <T > future = new CompletableFuture <T >() {
17- @ Override
18- public boolean cancel ( boolean mayInterruptIfRunning ) {
19- boolean cancelled = super . cancel ( mayInterruptIfRunning );
20- if (cancelled ) {
17+ CompletableFuture <T > future = Infrastructure . wrapCompletableFuture ( new CompletableFuture <T >());
18+ future . whenComplete (( val , x ) -> {
19+ if ( x instanceof CancellationException ) {
20+ // forward the cancellation to the uni
21+ if (future . isCancelled () ) {
2122 Cancellable c = cancellable .get ();
2223 if (c != null ) {
2324 c .cancel ();
2425 }
2526 }
26- return cancelled ;
2727 }
28- };
29-
28+ });
3029 cancellable .set (uni .subscribe ().with (context , future ::complete , future ::completeExceptionally ));
31- return Infrastructure .wrapCompletableFuture (future );
30+ // We return future here and not whatever is returned from future.whenComplete, because that
31+ // new stage will wrap any exceptions into a CompletionException which we do not want, and
32+ // is exposed by UniOrTest (at least)
33+ return future ;
3234 }
3335}
0 commit comments