Skip to content

Commit cb747b2

Browse files
committed
optimization: Uni.await()
- optimize the case where the item/failure is known and subscription/synchronization is not needed
1 parent 8b8d066 commit cb747b2

File tree

5 files changed

+56
-1
lines changed

5 files changed

+56
-1
lines changed

implementation/src/main/java/io/smallrye/mutiny/groups/UniAwait.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44

55
import java.time.Duration;
66
import java.util.Optional;
7+
import java.util.concurrent.CompletionException;
78

89
import io.smallrye.common.annotation.CheckReturnValue;
910
import io.smallrye.mutiny.Context;
1011
import io.smallrye.mutiny.TimeoutException;
1112
import io.smallrye.mutiny.Uni;
13+
import io.smallrye.mutiny.infrastructure.Infrastructure;
1214
import io.smallrye.mutiny.operators.uni.UniBlockingAwait;
15+
import io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownFailure;
16+
import io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem;
1317

1418
/**
1519
* Waits and returns the item emitted by the {@link Uni}. If the {@link Uni} receives a failure, the failure is thrown.
@@ -62,6 +66,13 @@ public T indefinitely() {
6266
* @return the item from the {@link Uni}, potentially {@code null}
6367
*/
6468
public T atMost(Duration duration) {
69+
// Optimize the case where the item/failure is known
70+
// and subscription/synchronization is not needed
71+
if (upstream instanceof UniCreateFromKnownItem<T> known) {
72+
return awaitKnownItem(known, duration);
73+
} else if (upstream instanceof UniCreateFromKnownFailure<T> known) {
74+
awaitKnownFailure(known, duration);
75+
}
6576
return UniBlockingAwait.await(upstream, duration, context);
6677
}
6778

@@ -76,4 +87,32 @@ public UniAwaitOptional<T> asOptional() {
7687
return new UniAwaitOptional<>(upstream, context);
7788
}
7889

90+
private T awaitKnownItem(UniCreateFromKnownItem<T> known, Duration duration) {
91+
validateDuration(duration);
92+
// Blocking should not matter in this case but we retain the check for backward compatibility
93+
if (!Infrastructure.canCallerThreadBeBlocked()) {
94+
throw UniBlockingAwait.currentThreadCannotBeBlocked();
95+
}
96+
return known.getItem();
97+
}
98+
99+
private void awaitKnownFailure(UniCreateFromKnownFailure<T> known, Duration duration) {
100+
validateDuration(duration);
101+
// Blocking should not matter in this case but we retain the check for backward compatibility
102+
if (!Infrastructure.canCallerThreadBeBlocked()) {
103+
throw UniBlockingAwait.currentThreadCannotBeBlocked();
104+
}
105+
Throwable throwable = known.getFailure();
106+
if (throwable instanceof RuntimeException) {
107+
throw (RuntimeException) throwable;
108+
}
109+
throw new CompletionException(throwable);
110+
}
111+
112+
private void validateDuration(Duration duration) {
113+
if (duration != null && (duration.isZero() || duration.isNegative())) {
114+
throw new IllegalArgumentException("`duration` must be greater than zero");
115+
}
116+
}
117+
79118
}

implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniBlockingAwait.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public static <T> T await(Uni<T> upstream, Duration duration, Context context) {
2727
validate(duration);
2828

2929
if (!Infrastructure.canCallerThreadBeBlocked()) {
30-
throw new IllegalStateException("The current thread cannot be blocked: " + Thread.currentThread().getName());
30+
throw currentThreadCannotBeBlocked();
3131
}
3232

3333
CountDownLatch latch = new CountDownLatch(1);
@@ -87,6 +87,10 @@ public void onFailure(Throwable failure) {
8787
}
8888
}
8989

90+
public static IllegalStateException currentThreadCannotBeBlocked() {
91+
return new IllegalStateException("The current thread cannot be blocked: " + Thread.currentThread().getName());
92+
}
93+
9094
private static void validate(Duration duration) {
9195
if (duration == null) {
9296
return;

implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/UniCreateFromKnownFailure.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
2323
new KnownFailureSubscription(subscriber).forward();
2424
}
2525

26+
public Throwable getFailure() {
27+
return failure;
28+
}
29+
2630
private class KnownFailureSubscription implements UniSubscription {
2731

2832
private final UniSubscriber<? super T> subscriber;

implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/UniCreateFromKnownItem.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
2323
new KnownItemSubscription(subscriber).forward();
2424
}
2525

26+
public T getItem() {
27+
return item;
28+
}
29+
2630
private class KnownItemSubscription implements UniSubscription {
2731

2832
private final UniSubscriber<? super T> subscriber;

implementation/src/test/java/io/smallrye/mutiny/operators/UniAwaitTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ public void testInvalidDurations() {
154154

155155
assertThatThrownBy(() -> one.await().atMost(Duration.ofSeconds(0)))
156156
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("duration");
157+
158+
assertThatThrownBy(
159+
() -> Uni.createFrom().failure(new IllegalArgumentException()).await().atMost(Duration.ofMillis(-2000)))
160+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("duration");
157161
}
158162

159163
@Test

0 commit comments

Comments
 (0)