Skip to content

Commit f1a94ee

Browse files
authored
Merge pull request #2014 from jponge/fix/1993
fix: race condition in Uni.combine().all()
2 parents 70bf5e5 + 0ef8ca6 commit f1a94ee

File tree

3 files changed

+69
-46
lines changed

3 files changed

+69
-46
lines changed

implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniAndCombination.java

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.util.concurrent.Flow.Subscription;
66
import java.util.concurrent.atomic.AtomicBoolean;
77
import java.util.concurrent.atomic.AtomicInteger;
8-
import java.util.concurrent.atomic.AtomicReference;
8+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
99
import java.util.function.Function;
1010
import java.util.stream.Collectors;
1111

@@ -58,14 +58,15 @@ private class AndSupervisor implements UniSubscription {
5858
private final List<UniHandler> handlers = new ArrayList<>();
5959
private final UniSubscriber<? super O> subscriber;
6060

61-
AtomicBoolean cancelled = new AtomicBoolean();
62-
AtomicInteger nextIndex = new AtomicInteger();
61+
final AtomicBoolean cancelled = new AtomicBoolean();
62+
final AtomicInteger nextIndex = new AtomicInteger();
63+
final AtomicInteger wip = new AtomicInteger();
6364

6465
AndSupervisor(UniSubscriber<? super O> sub) {
6566
subscriber = sub;
6667

6768
Context context = subscriber.context();
68-
for (Uni uni : unis) {
69+
for (Uni<?> uni : unis) {
6970
UniHandler result = new UniHandler(this, uni, context);
7071
handlers.add(result);
7172
}
@@ -78,7 +79,7 @@ private void run() {
7879
upperBound = handlers.size();
7980
} else {
8081
upperBound = Math.min(handlers.size(), concurrency);
81-
nextIndex = new AtomicInteger(upperBound);
82+
nextIndex.set(upperBound);
8283
}
8384
for (int i = 0; i < upperBound; i++) {
8485
if (cancelled.get()) {
@@ -103,40 +104,47 @@ public void cancel() {
103104
* @param failed whether the {@code res} just fired a failure
104105
*/
105106
void check(UniHandler res, boolean failed) {
106-
int incomplete = unis.size();
107-
108-
// One of the uni failed, and we can fire a failure immediately.
109-
if (failed && !collectAllFailureBeforeFiring) {
110-
if (cancelled.compareAndSet(false, true)) {
111-
// Cancel all subscriptions
112-
handlers.forEach(UniHandler::cancel);
113-
// Invoke observer
114-
subscriber.onFailure(res.failure);
115-
}
107+
if (wip.getAndIncrement() > 0) {
116108
return;
117109
}
118110

119-
for (UniHandler result : handlers) {
120-
if (result.failure != null || result.item != SENTINEL) {
121-
incomplete = incomplete - 1;
111+
int incomplete;
112+
do {
113+
incomplete = unis.size();
114+
115+
// One of the uni failed, and we can fire a failure immediately.
116+
if (failed && !collectAllFailureBeforeFiring) {
117+
if (cancelled.compareAndSet(false, true)) {
118+
// Cancel all subscriptions
119+
handlers.forEach(UniHandler::cancel);
120+
// Invoke observer
121+
subscriber.onFailure(res.failure);
122+
}
123+
return;
122124
}
123-
}
124125

125-
if (incomplete == 0) {
126-
// All unis has fired an event, check the outcome
127-
if (cancelled.compareAndSet(false, true)) {
128-
List<Throwable> failures = getFailures();
129-
List<Object> items = getItems();
130-
computeAndFireTheOutcome(failures, items);
126+
for (UniHandler result : handlers) {
127+
if (result.failure != null || result.item != SENTINEL) {
128+
incomplete = incomplete - 1;
129+
}
131130
}
132-
}
133131

134-
if (concurrency != -1 && !cancelled.get()) {
135-
int nextIndex = this.nextIndex.getAndIncrement();
136-
if (nextIndex < unis.size()) {
137-
handlers.get(nextIndex).subscribe();
132+
if (incomplete == 0) {
133+
// All unis has fired an event, check the outcome
134+
if (cancelled.compareAndSet(false, true)) {
135+
List<Throwable> failures = getFailures();
136+
List<Object> items = getItems();
137+
computeAndFireTheOutcome(failures, items);
138+
}
138139
}
139-
}
140+
141+
if (concurrency != -1 && !cancelled.get()) {
142+
int nextIndex = this.nextIndex.getAndIncrement();
143+
if (nextIndex < unis.size()) {
144+
handlers.get(nextIndex).subscribe();
145+
}
146+
}
147+
} while (wip.decrementAndGet() > 0 && incomplete > 0);
140148
}
141149

142150
private void computeAndFireTheOutcome(List<Throwable> failures, List<Object> items) {
@@ -172,14 +180,18 @@ private List<Throwable> getFailures() {
172180

173181
private class UniHandler implements UniSubscription, UniSubscriber {
174182

175-
final AtomicReference<UniSubscription> subscription = new AtomicReference<>();
176-
private final AndSupervisor supervisor;
177-
private final Uni uni;
178-
private final Context context;
183+
final AndSupervisor supervisor;
184+
final Uni<?> uni;
185+
final Context context;
186+
187+
volatile UniSubscription subscription;
179188
Object item = SENTINEL;
180189
Throwable failure;
181190

182-
UniHandler(AndSupervisor supervisor, Uni observed, Context context) {
191+
private static final AtomicReferenceFieldUpdater<UniAndCombination.UniHandler, UniSubscription> SUBSCRIPTION_UPDATER = AtomicReferenceFieldUpdater
192+
.newUpdater(UniAndCombination.UniHandler.class, UniSubscription.class, "subscription");
193+
194+
UniHandler(AndSupervisor supervisor, Uni<?> observed, Context context) {
183195
this.supervisor = supervisor;
184196
this.uni = observed;
185197
this.context = context;
@@ -192,7 +204,7 @@ public Context context() {
192204

193205
@Override
194206
public final void onSubscribe(UniSubscription sub) {
195-
if (!subscription.compareAndSet(null, sub)) {
207+
if (!SUBSCRIPTION_UPDATER.compareAndSet(this, null, sub)) {
196208
// cancelling this second subscription
197209
// because we already add a subscription (most probably CANCELLED)
198210
sub.cancel();
@@ -201,7 +213,7 @@ public final void onSubscribe(UniSubscription sub) {
201213

202214
@Override
203215
public final void onFailure(Throwable t) {
204-
if (subscription.getAndSet(EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
216+
if (SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
205217
// Already cancelled, do nothing
206218
Infrastructure.handleDroppedException(t);
207219
return;
@@ -212,7 +224,7 @@ public final void onFailure(Throwable t) {
212224

213225
@Override
214226
public final void onItem(Object x) {
215-
if (subscription.getAndSet(EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
227+
if (SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
216228
// Already cancelled, do nothing
217229
return;
218230
}
@@ -222,14 +234,14 @@ public final void onItem(Object x) {
222234

223235
@Override
224236
public void cancel() {
225-
Subscription sub = subscription.getAndSet(EmptyUniSubscription.CANCELLED);
237+
Subscription sub = SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED);
226238
if (sub != null) {
227239
sub.cancel();
228240
}
229241
}
230242

243+
@SuppressWarnings("unchecked")
231244
public void subscribe() {
232-
//noinspection unchecked
233245
AbstractUni.subscribe(uni, this);
234246
}
235247
}

implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@
88
import java.util.ArrayList;
99
import java.util.Comparator;
1010
import java.util.List;
11-
import java.util.concurrent.ExecutorService;
12-
import java.util.concurrent.Executors;
13-
import java.util.concurrent.ThreadFactory;
14-
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.*;
1512
import java.util.concurrent.atomic.AtomicBoolean;
1613
import java.util.concurrent.atomic.AtomicInteger;
1714
import java.util.concurrent.atomic.AtomicLong;
@@ -23,8 +20,10 @@
2320
import org.junit.jupiter.api.parallel.ResourceLock;
2421

2522
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
23+
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
2624
import io.smallrye.mutiny.infrastructure.Infrastructure;
2725
import io.smallrye.mutiny.tuples.Tuple2;
26+
import io.smallrye.mutiny.tuples.Tuple3;
2827
import junit5.support.InfrastructureResource;
2928

3029
@ResourceLock(value = InfrastructureResource.NAME, mode = ResourceAccessMode.READ)
@@ -133,4 +132,17 @@ void reproducer_1891() {
133132
assertThat(failure.get()).describedAs("No failure must have been emitted").isNull();
134133
assertThat(completed.get()).isTrue();
135134
}
135+
136+
@RepeatedTest(1_000)
137+
void reproducer_1993() {
138+
// Race condition in UniAndCombination, spotted in https://github.com/smallrye/smallrye-mutiny/issues/1993
139+
Uni<String> a = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "A"));
140+
Uni<String> b = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "B"));
141+
Uni<String> c = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "C"));
142+
Uni<Tuple3<String, String, String>> uni = Uni.combine().all().unis(a, b, c).usingConcurrencyOf(3).asTuple();
143+
144+
UniAssertSubscriber<Tuple3<String, String, String>> sub = uni.subscribe().withSubscriber(UniAssertSubscriber.create());
145+
sub.awaitItem(Duration.ofSeconds(5))
146+
.assertItem(Tuple3.of("A", "B", "C"));
147+
}
136148
}

implementation/src/test/java/io/smallrye/mutiny/operators/UniAndTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,5 +297,4 @@ public void testWithSetOfUnis() {
297297
.assertCompleted()
298298
.assertItem(1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9);
299299
}
300-
301300
}

0 commit comments

Comments
 (0)