Skip to content

Commit df2c619

Browse files
committed
fix: incorrect early ticket completion during downgrading
1 parent cd21b29 commit df2c619

4 files changed

Lines changed: 151 additions & 174 deletions

File tree

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,63 @@
11
package com.ishland.flowsched.scheduler;
22

3-
import java.util.concurrent.atomic.AtomicBoolean;
3+
import com.ishland.flowsched.util.Assertions;
4+
5+
import java.lang.invoke.MethodHandles;
6+
import java.lang.invoke.VarHandle;
47

58
public class Cancellable {
69

7-
private final AtomicBoolean cancelled = new AtomicBoolean(false);
10+
@SuppressWarnings("unused")
11+
private Runnable onCancel;
12+
13+
public boolean setup(Runnable onCancel) {
14+
final var result = VH_CANCEL.compareAndExchangeAcquire(this, null, onCancel);
15+
Assertions.assertTrue(result != COMPLETED, "Cancellation is already completed when setup");
16+
return result == null;
17+
}
18+
19+
public boolean complete() {
20+
final var witness = (Runnable) VH_CANCEL.compareAndExchangeRelease(this, null, COMPLETED);
21+
if (witness == CANCELLED || witness == COMPLETED) {
22+
return false;
23+
}
24+
if (witness == null) {
25+
return true;
26+
}
27+
// Set it to completed if we can; if we failed, it can only be cancel() or another complete()
28+
return witness == VH_CANCEL.compareAndExchangeRelease(this, witness, COMPLETED);
29+
}
830

9-
public void cancel() {
10-
this.cancelled.set(true);
31+
public boolean cancel() {
32+
final Runnable handle = (Runnable) VH_CANCEL.compareAndExchangeRelease(this, null, CANCELLED);
33+
if (handle == null) {
34+
return true;
35+
}
36+
if (handle != CANCELLED && handle != COMPLETED) {
37+
handle.run();
38+
// Set it to cancelled if we can; if we failed, it can only be complete() or another cancel()
39+
return handle == VH_CANCEL.compareAndExchangeRelease(this, handle, CANCELLED);
40+
}
41+
return false;
1142
}
1243

1344
public boolean isCancelled() {
14-
return this.cancelled.get();
45+
return VH_CANCEL.getAcquire(this) == CANCELLED;
1546
}
1647

48+
public boolean isCompleted() {
49+
return VH_CANCEL.getAcquire(this) == COMPLETED;
50+
}
51+
52+
private static final VarHandle VH_CANCEL;
53+
private static final Runnable CANCELLED = () -> {};
54+
private static final Runnable COMPLETED = () -> {};
55+
56+
static {
57+
try {
58+
VH_CANCEL = MethodHandles.lookup().findVarHandle(Cancellable.class, "onCancel", Runnable.class);
59+
} catch (NoSuchFieldException | IllegalAccessException e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
1763
}

src/main/java/com/ishland/flowsched/scheduler/CancellationSignaller.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class ItemHolder<K, V, Ctx, UserData> {
5454
private final AtomicReference<V> item = new AtomicReference<>();
5555
private final AtomicReference<UserData> userData = new AtomicReference<>();
5656
private final BusyRefCounter busyRefCounter = new BusyRefCounter();
57-
private final AtomicReference<Pair<CancellationSignaller, ItemStatus<K, V, Ctx>>> runningUpgradeAction = new AtomicReference<>();
57+
private final AtomicReference<Pair<Cancellable, ItemStatus<K, V, Ctx>>> runningAction = new AtomicReference<>();
5858
private final TicketSet<K, V, Ctx> tickets;
5959
private volatile ItemStatus<K, V, Ctx> status = null;
6060
// private final List<Pair<ItemStatus<K, V, Ctx>, Long>> statusHistory = ReferenceLists.synchronize(new ReferenceArrayList<>());
@@ -127,9 +127,9 @@ public synchronized boolean isBusy() {
127127
return busyRefCounter.isBusy();
128128
}
129129

130-
public ItemStatus<K, V, Ctx> upgradingStatusTo() {
130+
public ItemStatus<K, V, Ctx> changingStatusTo() {
131131
assertOpen();
132-
final Pair<CancellationSignaller, ItemStatus<K, V, Ctx>> pair = this.runningUpgradeAction.get();
132+
final Pair<Cancellable, ItemStatus<K, V, Ctx>> pair = this.runningAction.get();
133133
return pair != null ? pair.right() : null;
134134
}
135135

@@ -144,7 +144,9 @@ public void addTicket(ItemStatus<K, V, Ctx> targetStatus, ItemTicket ticket) {
144144
}
145145
this.tickets.addUnchecked(targetStatus);
146146
createFutures();
147-
needConsumption = targetStatus.ordinal() <= this.getStatus().ordinal();
147+
final var current = this.getStatus();
148+
final var projected = this.changingStatusTo();
149+
needConsumption = targetStatus.ordinal() <= (projected == null ? current.ordinal() : Math.min(current.ordinal(), projected.ordinal()));
148150
}
149151

150152
if (needConsumption) {
@@ -208,16 +210,21 @@ BusyRefCounter busyRefCounter() {
208210
return this.busyRefCounter;
209211
}
210212

211-
public void submitUpgradeAction(CancellationSignaller signaller, ItemStatus<K, V, Ctx> status) {
213+
public void finishAction() {
214+
assertOpen();
215+
Pair<Cancellable, ItemStatus<K, V, Ctx>> current = this.runningAction.getAndSet(null);
216+
Assertions.assertTrue(current != null, "No action is present when trying to finish an action");
217+
}
218+
219+
public void submitAction(Cancellable cancellation, ItemStatus<K, V, Ctx> status) {
212220
assertOpen();
213-
final boolean success = this.runningUpgradeAction.compareAndSet(null, Pair.of(signaller, status));
221+
final var success = this.runningAction.compareAndSet(null, Pair.of(cancellation, status));
214222
Assertions.assertTrue(success, "Only one action can happen at a time");
215-
signaller.addListener(unused -> this.runningUpgradeAction.set(null));
216223
}
217224

218-
public void tryCancelUpgradeAction() {
225+
public void tryCancelAction() {
219226
assertOpen();
220-
final Pair<CancellationSignaller, ItemStatus<K, V, Ctx>> signaller = this.runningUpgradeAction.get();
227+
final Pair<Cancellable, ItemStatus<K, V, Ctx>> signaller = this.runningAction.get();
221228
if (signaller != null) {
222229
signaller.left().cancel();
223230
}

0 commit comments

Comments
 (0)