Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/main/java/org/openrewrite/polyglot/ProgressBar.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,20 @@ public interface ProgressBar extends AutoCloseable {
ProgressBar setExtraMessage(String extraMessage);

ProgressBar setMax(int max);

/**
* Set the canceled state of the progress bar.
* @param canceled true if the operation has been canceled
*/
default void setCanceled(boolean canceled) {
// Default no-op implementation for backward compatibility
}

/**
* Check if the progress bar has been marked as canceled.
* @return true if the operation has been canceled
*/
default boolean isCanceled() {
return false;
}
}
116 changes: 91 additions & 25 deletions src/main/java/org/openrewrite/polyglot/RemoteProgressBarReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.net.*;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
Expand All @@ -36,7 +35,11 @@ public class RemoteProgressBarReceiver implements ProgressBar {
private final ProgressBar delegate;
private final DatagramSocket socket;
private volatile boolean closed;
private final AtomicReference<String> thrown = new AtomicReference<>();
private final AtomicReference<@Nullable String> thrown = new AtomicReference<>();
private volatile boolean canceled = false;
private @Nullable InetAddress lastSenderAddress;
private int lastSenderPort;
private volatile boolean cancelNotificationSent = false;

public RemoteProgressBarReceiver(ProgressBar delegate) {
try {
Expand All @@ -61,28 +64,47 @@ protected boolean removeEldestEntry(Map.Entry<UUID, RemoteProgressMessage> eldes
};
try {
while (!closed) {
RemoteProgressMessage message = RemoteProgressMessage.receive(socket, incompleteMessages);
if (message == null) {
continue;
}
switch (message.getType()) {
case Exception:
if (message.getMessage() != null) {
thrown.set(message.getMessage());
}
break;
case IntermediateResult:
delegate.intermediateResult(message.getMessage());
break;
case Step:
delegate.step();
break;
case SetExtraMessage:
delegate.setExtraMessage(requireNonNull(message.getMessage()));
break;
case SetMax:
delegate.setMax(Integer.parseInt(requireNonNull(message.getMessage())));
break;
// Receive with packet info to get sender details
byte[] buf = new byte[128];
DatagramPacket packet = new DatagramPacket(buf, 128);
try {
socket.receive(packet);

// Store sender info for sending cancel status back
lastSenderAddress = packet.getAddress();
lastSenderPort = packet.getPort();

RemoteProgressMessage message = RemoteProgressMessage.read(buf, packet.getLength(), incompleteMessages);
if (message == null) {
continue;
}
switch (message.getType()) {
case Exception:
if (message.getMessage() != null) {
thrown.set(message.getMessage());
}
break;
case IntermediateResult:
delegate.intermediateResult(message.getMessage());
break;
case Step:
delegate.step();
break;
case SetExtraMessage:
delegate.setExtraMessage(requireNonNull(message.getMessage()));
break;
case SetMax:
delegate.setMax(Integer.parseInt(requireNonNull(message.getMessage())));
break;
}

// Only send cancel status if we haven't already notified about cancellation
if ((canceled || delegate.isCanceled()) && !cancelNotificationSent) {
sendCancelStatus();
cancelNotificationSent = true;
}
} catch (SocketTimeoutException ignored) {
// No message received, continue
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -136,4 +158,48 @@ private void maybeThrow() {
throw RemoteException.decode(t);
}
}

private void sendCancelStatus() {
if (lastSenderAddress != null && lastSenderPort > 0) {
try {
// Send a cancel notification message
String cancelMessage = "CANCEL:true";
byte[] cancelBytes = cancelMessage.getBytes();
DatagramPacket cancelPacket = new DatagramPacket(
cancelBytes,
cancelBytes.length,
lastSenderAddress,
lastSenderPort
);

// Try a few times to ensure delivery (since we only send once per cancellation)
for (int i = 0; i < 3; i++) {
socket.send(cancelPacket);
if (i < 2) {
Thread.sleep(10); // Small delay between retries
}
}
} catch (IOException | InterruptedException ignored) {
// Ignore failures when sending cancel status
}
}
}

@Override
public void setCanceled(boolean canceled) {
boolean wasNotCanceled = !this.canceled;
this.canceled = canceled;
delegate.setCanceled(canceled);

// If we just became canceled and haven't sent notification yet, send it
if (wasNotCanceled && canceled && !cancelNotificationSent) {
sendCancelStatus();
cancelNotificationSent = true;
}
}

@Override
public boolean isCanceled() {
return canceled || delegate.isCanceled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class RemoteProgressBarSender implements ProgressBar {
private DatagramSocket socket;
private InetAddress address;
private int port;
private volatile boolean canceled = false;

public RemoteProgressBarSender(int port) {
this(null, port);
Expand All @@ -41,12 +42,16 @@ public RemoteProgressBarSender(@Nullable InetAddress address, int port) {
this.socket = new DatagramSocket();
this.port = port;
this.address = address == null ? InetAddress.getByName(localhost) : address;

// Set socket to non-blocking mode for checking cancel messages
this.socket.setSoTimeout(1); // 1ms timeout for non-blocking receive
} catch (UnknownHostException | SocketException e) {
if ("host.docker.internal".equals(localhost)) {
try {
this.address = InetAddress.getByName("localhost");
this.port = port;
this.socket = new DatagramSocket();
this.socket.setSoTimeout(1); // 1ms timeout for non-blocking
} catch (UnknownHostException | SocketException ex) {
throw new UncheckedIOException(ex);
}
Expand Down Expand Up @@ -94,6 +99,10 @@ public void throwRemote(RemoteException ex) {

private void send(Type type, @Nullable String message) {
try {
// Check for any pending cancel messages before sending
drainCancelMessages();

// Send the message
for (byte[] packet : RemoteProgressMessage.toPackets(type, message)) {
socket.send(new DatagramPacket(packet, packet.length, address, port));
}
Expand All @@ -103,4 +112,50 @@ private void send(Type type, @Nullable String message) {
throw new UncheckedIOException(e);
}
}

/**
* Non-blocking check for any pending cancel messages.
* Drains all available cancel messages from the socket buffer.
*/
private void drainCancelMessages() {
if (canceled) {
return; // Already canceled, no need to check
}

try {
byte[] buf = new byte[128];
DatagramPacket packet = new DatagramPacket(buf, buf.length);

// Keep reading while there are messages available (non-blocking due to timeout=0)
while (true) {
try {
socket.receive(packet);

// Parse the received packet to check if it's a cancel message
String received = new String(packet.getData(), 0, packet.getLength());
if (received.contains("CANCEL:true")) {
canceled = true;
// Continue draining to clear the buffer
}
} catch (SocketTimeoutException e) {
// No more messages available, done draining
break;
}
}
} catch (IOException ignored) {
// Ignore other IO exceptions during cancel check
}
}

@Override
public void setCanceled(boolean canceled) {
this.canceled = canceled;
}

@Override
public boolean isCanceled() {
// Also check for pending cancel messages when queried
drainCancelMessages();
return canceled;
}
}
Loading
Loading