Skip to content

Commit 1a53136

Browse files
committed
Ignore empty SSE messages, accept \r\n and \n for line separators
1 parent 5fefa8a commit 1a53136

File tree

2 files changed

+34
-10
lines changed

2 files changed

+34
-10
lines changed

mcp/runtime/src/main/java/io/quarkiverse/langchain4j/mcp/runtime/http/QuarkusStreamableHttpMcpTransport.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@
3131
import io.quarkiverse.langchain4j.mcp.auth.McpClientAuthProvider;
3232
import io.smallrye.mutiny.Uni;
3333
import io.smallrye.mutiny.infrastructure.Infrastructure;
34+
import io.vertx.core.Handler;
3435
import io.vertx.core.MultiMap;
36+
import io.vertx.core.buffer.Buffer;
3537
import io.vertx.core.http.HttpClient;
3638
import io.vertx.core.http.HttpMethod;
3739
import io.vertx.core.http.RequestOptions;
38-
import io.vertx.core.parsetools.RecordParser;
3940

4041
public class QuarkusStreamableHttpMcpTransport implements McpTransport {
4142

@@ -151,16 +152,34 @@ private Uni<JsonNode> execute(McpClientMessage request, Long id) {
151152
log.debug("Assigned MCP session ID: " + mcpSessionId);
152153
this.mcpSessionId.set(mcpSessionId);
153154
}
154-
155-
RecordParser sseEventparser = RecordParser.newDelimited("\n\n", bodyBuffer -> {
156-
String responseString = bodyBuffer.toString();
157-
SseEvent<String> sseEvent = parseSseEvent(responseString);
158-
sseSubscriber.accept(sseEvent);
159-
});
160-
155+
// Handler that splits SSE events whether they are separated by \r\n\r\n or \n\n
156+
Handler<Buffer> sseEventparser = new Handler<Buffer>() {
157+
private StringBuffer sb = new StringBuffer();
158+
159+
@Override
160+
public void handle(Buffer event) {
161+
sb.append(event.toString());
162+
String str = sb.toString();
163+
if (str.contains("\r\n\r\n")) {
164+
String[] parts = str.split("\r\n\r\n", 2);
165+
String eventStr = parts[0];
166+
sb = new StringBuffer();
167+
sb.append(parts[1]);
168+
SseEvent<String> sseEvent = parseSseEvent(eventStr);
169+
sseSubscriber.accept(sseEvent);
170+
} else if (str.contains("\n\n")) {
171+
String[] parts = str.split("\n\n", 2);
172+
String eventStr = parts[0];
173+
sb = new StringBuffer();
174+
sb.append(parts[1]);
175+
SseEvent<String> sseEvent = parseSseEvent(eventStr);
176+
sseSubscriber.accept(sseEvent);
177+
}
178+
}
179+
};
161180
String contentType = response.result().getHeader("Content-Type");
162181
if (id != null && contentType != null && contentType.contains("text/event-stream")) {
163-
// the server has started a SSE channel
182+
// the server has started an SSE channel
164183
response.result().handler(sseEventparser);
165184
} else {
166185
// the server has sent a single regular response
@@ -202,7 +221,8 @@ private MultivaluedMap<String, Object> toMultivaluedMap(MultiMap multiMap) {
202221

203222
// FIXME: this may be brittle, is there a more standard way to parse SSE events?
204223
private SseEvent<String> parseSseEvent(String responseString) {
205-
Map<String, String> entries = Arrays.stream(responseString.split("\\n"))
224+
// use \\R to match any line ending because some servers use \r\n and some use \n
225+
Map<String, String> entries = Arrays.stream(responseString.split("\\R"))
206226
.collect(Collectors.toMap(s -> s.substring(0, s.indexOf(":")),
207227
s -> s.substring(s.indexOf(":") + 2)));
208228
return new SseEvent<String>() {

mcp/runtime/src/main/java/io/quarkiverse/langchain4j/mcp/runtime/http/SseSubscriber.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public SseSubscriber(
3232

3333
@Override
3434
public void accept(SseEvent<String> s) {
35+
// some servers send empty messages as pings
36+
if (s.data().isEmpty() || s.data().isBlank()) {
37+
return;
38+
}
3539
if (logEvents) {
3640
log.debug("< " + s.data());
3741
}

0 commit comments

Comments
 (0)