From 5ffd667af0ebdb14c6c017be5fb634e7c5b8e1ac Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Wed, 26 Nov 2025 19:39:21 -0300 Subject: [PATCH 1/6] feat(wstransport): support concurrent accept in WsTransport * depends on: https://github.com/status-im/nim-websock/pull/180 * addresses: https://github.com/waku-org/nwaku/issues/3634 --- libp2p/transports/wstransport.nim | 262 ++++++++++++++++++++---------- 1 file changed, 175 insertions(+), 87 deletions(-) diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index de52fcb35a..516028d0df 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -37,6 +37,7 @@ const DefaultHeadersTimeout = 3.seconds DefaultAutotlsWaitTimeout = 3.seconds DefaultAutotlsRetries = 3 + DefaultConcurrentAcceptsPerHttpServer = 100 type WsStream = ref object of Connection @@ -111,11 +112,18 @@ method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} = method getWrapped*(s: WsStream): Connection = nil +type + AcceptResult = Result[Connection, ref CatchableError] + AcceptDispatcherFinishedError = object of CatchableError + type WsTransport* = ref object of Transport httpservers: seq[HttpServer] wsserver: WSServer connections: array[Direction, seq[WsStream]] acceptFuts: seq[Future[HttpRequest]] + acceptResults: AsyncQueue[AcceptResult] + acceptLoop: Future[void] + concurrentAcceptsPerHttpServer: int tlsPrivateKey*: TLSPrivateKey tlsCertificate*: TLSCertificate @@ -129,6 +137,123 @@ type WsTransport* = ref object of Transport proc secure*(self: WsTransport): bool = not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate)) +proc connHandler( + self: WsTransport, stream: WSSession, secure: bool, dir: Direction +): Future[Connection] {.async: (raises: [CatchableError]).} = + ## Returning CatchableError is fine because we later handle different exceptions. + + let (observedAddr, localAddr) = + try: + let + codec = + if secure: + MultiAddress.init("/wss") + else: + MultiAddress.init("/ws") + remoteAddr = stream.stream.reader.tsource.remoteAddress + localAddr = stream.stream.reader.tsource.localAddress + + ( + MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(), + MultiAddress.init(localAddr).tryGet() & codec.tryGet(), + ) + except CatchableError as exc: + trace "Failed to create observedAddr or listenAddr", description = exc.msg + if not (isNil(stream) and stream.stream.reader.closed): + safeClose(stream) + raise exc + + let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr)) + + self.connections[dir].add(conn) + proc onClose() {.async: (raises: []).} = + await noCancel conn.session.stream.reader.join() + self.connections[dir].keepItIf(it != conn) + trace "Cleaned up client" + + asyncSpawn onClose() + return conn + +proc handshakeWorker( + self: WsTransport, finished: Future[HttpRequest], secure: bool +) {.async: (raises: []).} = + try: + let req = await finished + try: + let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout) + let conn = await self.connHandler(wstransp, secure, Direction.In) + try: + self.acceptResults.addLastNoWait(AcceptResult.ok(conn)) + except AsyncQueueFullError: + await noCancel req.stream.closeWait() + except CatchableError as exc: + await noCancel req.stream.closeWait() + try: + self.acceptResults.addLastNoWait(AcceptResult.err(exc)) + except AsyncQueueFullError: + discard + except CatchableError as exc: + try: + self.acceptResults.addLastNoWait(AcceptResult.err(exc)) + except AsyncQueueFullError: + discard + +proc acceptDispatcher(self: WsTransport) {.async: (raises: []).} = + trace "Entering acceptDispatcher" + + # Sequentially enqueue N accepts per HttpServer into acceptFuts so we can recover the + # index into httpservers by simply dividing the index of the completed future by N. + for server in self.httpservers: + for _ in 0 ..< self.concurrentAcceptsPerHttpServer: + self.acceptFuts.add(server.accept()) + + # Either httpservers is empty, or concurrentAcceptsPerHttpServer is zero (which is a defect) + if self.acceptFuts.len == 0: + warn "WsTransport.acceptDispatcher has no work; exiting" + return + + while self.running: + try: + var finished: Future[HttpRequest] + try: + finished = await one(self.acceptFuts) + except ValueError as exc: + raise newException(AssertionDefect, "wstransport accept error: " & exc.msg, exc) + + let futIndex = self.acceptFuts.find(finished) + if futIndex < 0: + continue + let serverIndex = futIndex div self.concurrentAcceptsPerHttpServer + if serverIndex >= self.httpservers.len: + raise newException(AssertionDefect, "wstransport server index out of bounds") + let httpServer = self.httpservers[serverIndex] + + # Replenish the completed accept() future for the same server + self.acceptFuts[futIndex] = httpServer.accept() + + asyncSpawn self.handshakeWorker(finished, httpServer.secure) + + await sleepAsync(0) # be nice + except CatchableError as exc: + # Dispatcher should never exit before self.running is false + if not self.running: + break + # Unexpected error, so yield for a while before resuming dispatch + trace "Error in accept dispatcher", msg = exc.msg + try: + await sleepAsync(100.milliseconds) + except CancelledError: + discard + try: + # This error never leaves the results queue, guaranteeing popFirst never deadlocks + self.acceptResults.addLastNoWait( + AcceptResult.err(newException(AcceptDispatcherFinishedError, "Server is closed")) + ) + except AsyncQueueFullError: + raise newException(AssertionDefect, "wstransport accept results queue full") + + trace "Exiting acceptDispatcher" + method start*( self: WsTransport, addrs: seq[MultiAddress] ) {.async: (raises: [LPError, transport.TransportError, CancelledError]).} = @@ -209,6 +334,9 @@ method start*( trace "Listening on", addresses = self.addrs + self.acceptResults = newAsyncQueue[AcceptResult]() + self.acceptLoop = self.acceptDispatcher() + method stop*(self: WsTransport) {.async: (raises: []).} = ## stop the transport ## @@ -224,6 +352,9 @@ method stop*(self: WsTransport) {.async: (raises: []).} = self.connections[Direction.Out].mapIt(it.close()) ) + if not isNil(self.acceptLoop): + await self.acceptLoop.cancelAndWait() + var toWait: seq[Future[void]] for fut in self.acceptFuts: if not fut.finished: @@ -242,43 +373,6 @@ method stop*(self: WsTransport) {.async: (raises: []).} = except CatchableError as exc: trace "Error shutting down ws transport", description = exc.msg -proc connHandler( - self: WsTransport, stream: WSSession, secure: bool, dir: Direction -): Future[Connection] {.async: (raises: [CatchableError]).} = - ## Returning CatchableError is fine because we later handle different exceptions. - - let (observedAddr, localAddr) = - try: - let - codec = - if secure: - MultiAddress.init("/wss") - else: - MultiAddress.init("/ws") - remoteAddr = stream.stream.reader.tsource.remoteAddress - localAddr = stream.stream.reader.tsource.localAddress - - ( - MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(), - MultiAddress.init(localAddr).tryGet() & codec.tryGet(), - ) - except CatchableError as exc: - trace "Failed to create observedAddr or listenAddr", description = exc.msg - if not (isNil(stream) and stream.stream.reader.closed): - safeClose(stream) - raise exc - - let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr)) - - self.connections[dir].add(conn) - proc onClose() {.async: (raises: []).} = - await noCancel conn.session.stream.reader.join() - self.connections[dir].keepItIf(it != conn) - trace "Cleaned up client" - - asyncSpawn onClose() - return conn - method accept*( self: WsTransport ): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} = @@ -294,58 +388,44 @@ method accept*( if not self.running: raise newTransportClosedError() - if self.acceptFuts.len <= 0: - self.acceptFuts = self.httpservers.mapIt(it.accept()) - - if self.acceptFuts.len <= 0: + let res = await self.acceptResults.popFirst() + + if res.isErr: + let exc = res.error + if exc of WebSocketError: + debug "Websocket Error", description = exc.msg + elif exc of HttpError: + debug "Http Error", description = exc.msg + elif exc of AsyncStreamError: + debug "AsyncStream Error", description = exc.msg + elif exc of TransportTooManyError: + debug "Too many files opened", description = exc.msg + elif exc of TransportAbortedError: + debug "Connection aborted", description = exc.msg + elif exc of AsyncTimeoutError: + debug "Timed out", description = exc.msg + elif exc of TransportOsError: + debug "OS Error", description = exc.msg + elif exc of TransportUseClosedError: + debug "Server was closed", description = exc.msg + raise newTransportClosedError(exc) + elif exc of AcceptDispatcherFinishedError: + try: + self.acceptResults.addLastNoWait(res) + except AsyncQueueFullError: + raise newException(AssertionDefect, "wstransport handshakeResults queue full") + debug "Server was closed", description = exc.msg + raise newTransportClosedError(exc) + elif exc of CancelledError: + raise (ref CancelledError)(exc) + else: + info "Unexpected error accepting connection", description = exc.msg + raise newException( + transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc + ) return - let finished = - try: - await one(self.acceptFuts) - except ValueError: - raiseAssert("already checked with if") - except CancelledError as e: - raise e - - let index = self.acceptFuts.find(finished) - self.acceptFuts[index] = self.httpservers[index].accept() - - try: - let req = await finished - - try: - let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout) - let isSecure = self.httpservers[index].secure - - return await self.connHandler(wstransp, isSecure, Direction.In) - except CatchableError as exc: - await noCancel req.stream.closeWait() - raise exc - except WebSocketError as exc: - debug "Websocket Error", description = exc.msg - except HttpError as exc: - debug "Http Error", description = exc.msg - except AsyncStreamError as exc: - debug "AsyncStream Error", description = exc.msg - except TransportTooManyError as exc: - debug "Too many files opened", description = exc.msg - except TransportAbortedError as exc: - debug "Connection aborted", description = exc.msg - except AsyncTimeoutError as exc: - debug "Timed out", description = exc.msg - except TransportUseClosedError as exc: - debug "Server was closed", description = exc.msg - raise newTransportClosedError(exc) - except CancelledError as exc: - raise exc - except TransportOsError as exc: - debug "OS Error", description = exc.msg - except CatchableError as exc: - info "Unexpected error accepting connection", description = exc.msg - raise newException( - transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc - ) + return res.value method dial*( self: WsTransport, @@ -393,6 +473,7 @@ proc new*( factories: openArray[ExtFactory] = [], rng: ref HmacDrbgContext = nil, handshakeTimeout = DefaultHeadersTimeout, + concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer, ): T {.raises: [].} = ## Creates a secure WebSocket transport @@ -406,6 +487,11 @@ proc new*( factories: @factories, rng: rng, handshakeTimeout: handshakeTimeout, + concurrentAcceptsPerHttpServer: + if concurrentAcceptsPerHttpServer <= 0: + DefaultConcurrentAcceptsPerHttpServer + else: + concurrentAcceptsPerHttpServer, ) procCall Transport(self).initialize() self @@ -417,6 +503,7 @@ proc new*( factories: openArray[ExtFactory] = [], rng: ref HmacDrbgContext = nil, handshakeTimeout = DefaultHeadersTimeout, + concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer, ): T {.raises: [].} = ## Creates a clear-text WebSocket transport @@ -429,4 +516,5 @@ proc new*( factories = @factories, rng = rng, handshakeTimeout = handshakeTimeout, + concurrentAcceptsPerHttpServer = concurrentAcceptsPerHttpServer, ) From 1e4564f402440c0150aef328e5bcb876a5318e86 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Fri, 28 Nov 2025 13:54:49 -0300 Subject: [PATCH 2/6] update .pinned websock dep to pick up AsyncLock around StreamServer update --- .pinned | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pinned b/.pinned index a3f67b6fc4..364c52d21c 100644 --- a/.pinned +++ b/.pinned @@ -15,7 +15,7 @@ serialization;https://github.com/status-im/nim-serialization@#548d0adc9797a10b2d stew;https://github.com/status-im/nim-stew@#b66168735d6f3841c5239c3169d3fe5fe98b1257 testutils;https://github.com/status-im/nim-testutils@#9e842bd58420d23044bc55e16088e8abbe93ce51 unittest2;https://github.com/status-im/nim-unittest2@#8b51e99b4a57fcfb31689230e75595f024543024 -websock;https://github.com/status-im/nim-websock@#35ae76f1559e835c80f9c1a3943bf995d3dd9eb5 +websock;https://github.com/status-im/nim-websock@#3986b3557714d9f3bbbc3854d08a3c404996a4d7 zlib;https://github.com/status-im/nim-zlib@#daa8723fd32299d4ca621c837430c29a5a11e19a jwt;https://github.com/vacp2p/nim-jwt@#18f8378de52b241f321c1f9ea905456e89b95c6f bearssl_pkey_decoder;https://github.com/vacp2p/bearssl_pkey_decoder@#21dd3710df9345ed2ad8bf8f882761e07863b8e0 From a57f62bf38e83a71939d1ca93df377b64332ccb3 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Mon, 1 Dec 2025 12:52:52 -0300 Subject: [PATCH 3/6] try stopping httpservers first --- libp2p/transports/wstransport.nim | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index 516028d0df..94a1bac3a5 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -355,6 +355,9 @@ method stop*(self: WsTransport) {.async: (raises: []).} = if not isNil(self.acceptLoop): await self.acceptLoop.cancelAndWait() + for server in self.httpservers: + server.stop() + var toWait: seq[Future[void]] for fut in self.acceptFuts: if not fut.finished: @@ -363,7 +366,6 @@ method stop*(self: WsTransport) {.async: (raises: []).} = toWait.add(fut.read().stream.closeWait()) for server in self.httpservers: - server.stop() toWait.add(server.closeWait()) await allFutures(toWait) From 5ee04876048e658a6a450e29cdd196165c437a07 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Mon, 1 Dec 2025 14:27:35 -0300 Subject: [PATCH 4/6] add temporary shotgun debugging echos for CI --- libp2p/transports/wstransport.nim | 26 +++++++++++++++++---- tests/libp2p/transports/stream_tests.nim | 29 ++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index 94a1bac3a5..7c6feaf869 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -340,38 +340,56 @@ method start*( method stop*(self: WsTransport) {.async: (raises: []).} = ## stop the transport ## - + echo "[DEBUG WSTRANSPORT] WsTransport.stop" self.running = false # mark stopped as soon as possible try: trace "Stopping WS transport" + echo "[DEBUG WSTRANSPORT] stop transport" await procCall Transport(self).stop() # call base + echo "[DEBUG WSTRANSPORT] allfinished connections" discard await allFinished( self.connections[Direction.In].mapIt(it.close()) & self.connections[Direction.Out].mapIt(it.close()) ) if not isNil(self.acceptLoop): + echo "[DEBUG WSTRANSPORT] stop dispatcher" await self.acceptLoop.cancelAndWait() + echo "[DEBUG WSTRANSPORT] stopped dispatcher" - for server in self.httpservers: + for i, server in self.httpservers: + echo "[DEBUG WSTRANSPORT] stop server ", i server.stop() + echo "[DEBUG WSTRANSPORT] stopped server ", i var toWait: seq[Future[void]] - for fut in self.acceptFuts: + echo "[DEBUG WSTRANSPORT] self acceptfuts loop count=", self.acceptFuts.len + for i, fut in self.acceptFuts: if not fut.finished: + if i <= 3: + echo "[DEBUG WSTRANSPORT] toWait add cancelAndWait ", i toWait.add(fut.cancelAndWait()) + if i <= 3: + echo "[DEBUG WSTRANSPORT] toWait add cancelAndWait done ", i elif fut.completed: + echo "[DEBUG WSTRANSPORT] toWait add stream closeWait ", i toWait.add(fut.read().stream.closeWait()) + echo "[DEBUG WSTRANSPORT] toWait add stream closeWait done ", i - for server in self.httpservers: + for i, server in self.httpservers: + echo "[DEBUG WSTRANSPORT] server closewait ", i toWait.add(server.closeWait()) + echo "[DEBUG WSTRANSPORT] server closewait done ", i + echo "[DEBUG WSTRANSPORT] await allFutures" await allFutures(toWait) + echo "[DEBUG WSTRANSPORT] await allFutures done" self.httpservers = @[] trace "Transport stopped" + echo "[DEBUG WSTRANSPORT] wstransport stopped" except CatchableError as exc: trace "Error shutting down ws transport", description = exc.msg diff --git a/tests/libp2p/transports/stream_tests.nim b/tests/libp2p/transports/stream_tests.nim index 86294f0973..2b30e82ec3 100644 --- a/tests/libp2p/transports/stream_tests.nim +++ b/tests/libp2p/transports/stream_tests.nim @@ -457,14 +457,17 @@ template streamTransportTest*( serverStreamHandlerFuts[i] = newFuture[void]() proc serverHandler(server: Transport) {.async.} = + echo "[DEBUG] serverHandler start" # Accept multiple connections and handle them var futs: seq[Future[void]] for i in 0 ..< numConnections: # Use a proc to properly capture loop index proc setupConnection(conn: Connection, handlerIndex: int) = + echo "[DEBUG] serverHandler setupConnection ", handlerIndex let muxer = streamProvider(conn, false) muxer.streamHandler = proc(stream: Connection) {.async: (raises: []).} = noExceptionWithStreamClose(stream): + echo "[DEBUG] serverHandler handler start ", handlerIndex # Read data in chunks with random delay var receivedData: seq[byte] = @[] while receivedData.len < messageSize: @@ -482,62 +485,88 @@ template streamTransportTest*( check receivedData == newData(messageSize, byte(handlerIndex)) # Send back ID + echo "[DEBUG] serverHandler handler write ", handlerIndex await stream.write(@[byte(receivedData[0])]) # Signal that this stream handler is done + echo "[DEBUG] serverHandler handler complete ", handlerIndex serverStreamHandlerFuts[handlerIndex].complete() let startStreamHandlerAndCleanup = proc() {.async.} = + echo "[DEBUG] serverHandler before muxer handle ", handlerIndex await muxer.handle() + echo "[DEBUG] serverHandler after muxer handle ", handlerIndex # Wait for the stream handler to complete before closing await serverStreamHandlerFuts[handlerIndex] + echo "[DEBUG] serverHandler after await serverStreamHandlerFuts ", handlerIndex await muxer.close() + echo "[DEBUG] serverHandler after muxer close ", handlerIndex await conn.close() + echo "[DEBUG] serverHandler after conn close ", handlerIndex futs.add(startStreamHandlerAndCleanup()) let conn = await server.accept() setupConnection(conn, i) + echo "[DEBUG] server before await allFutures(futs)" await allFutures(futs) + echo "[DEBUG] server after await allFutures(futs)" proc runClient(server: Transport, connectionId: int) {.async.} = + echo "[DEBUG] client ", connectionId, " create client" let client = transportProvider() let conn = await client.dial(server.addrs[0]) + echo "[DEBUG] client ", connectionId, " create muxer" let muxer = streamProvider(conn) + echo "[DEBUG] client ", connectionId, " create stream" let stream = await muxer.newStream() # Write data in chunks with random delay + echo "[DEBUG] client ", connectionId, " write" let message = newData(chunkSize, byte(connectionId)) for i in 0 ..< chunkCount: await stream.write(message) # Random delay between writes (20-100ms) await sleepAsync(rand(20 .. 100).milliseconds) + echo "[DEBUG] client ", connectionId, " read" var buffer: array[1, byte] await stream.readExactly(addr buffer, 1) # Verify we got back our own connection ID check buffer[0] == byte(connectionId) + echo "[DEBUG] Client ", connectionId, " close stream" await stream.close() + echo "[DEBUG] Client ", connectionId, " close muxer" await muxer.close() + echo "[DEBUG] Client ", connectionId, " close conn" await conn.close() + echo "[DEBUG] Client ", connectionId, " done" + echo "[DEBUG] server create" let server = transportProvider() + echo "[DEBUG] server start" await server.start(ma) + echo "[DEBUG] serverTask" let serverTask = serverHandler(server) # Start multiple concurrent clients + echo "[DEBUG] creating ", numConnections, " clients" var clientFuts: seq[Future[void]] for i in 0 ..< numConnections: clientFuts.add(runClient(server, i)) + echo "[DEBUG] await allFutures(clientFuts)" await allFutures(clientFuts) + echo "[DEBUG] clientFuts finished" await serverTask + echo "[DEBUG] serverTask finished" await server.stop() + echo "[DEBUG] server stopped" # Assert parallelism by counting transitions between different connection IDs # Total reads: 5 connections × 32 chunks = 160 From 62d56b6edcd5116696dc4b21f19c6eb5efd99abe Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Mon, 1 Dec 2025 15:10:59 -0300 Subject: [PATCH 5/6] add temporary debugging to ws test client --- tests/libp2p/transports/stream_tests.nim | 61 +++++++++++++----------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/tests/libp2p/transports/stream_tests.nim b/tests/libp2p/transports/stream_tests.nim index 2b30e82ec3..4947594f9e 100644 --- a/tests/libp2p/transports/stream_tests.nim +++ b/tests/libp2p/transports/stream_tests.nim @@ -515,37 +515,40 @@ template streamTransportTest*( echo "[DEBUG] server after await allFutures(futs)" proc runClient(server: Transport, connectionId: int) {.async.} = - echo "[DEBUG] client ", connectionId, " create client" - let client = transportProvider() - let conn = await client.dial(server.addrs[0]) - echo "[DEBUG] client ", connectionId, " create muxer" - let muxer = streamProvider(conn) - - echo "[DEBUG] client ", connectionId, " create stream" - let stream = await muxer.newStream() - - # Write data in chunks with random delay - echo "[DEBUG] client ", connectionId, " write" - let message = newData(chunkSize, byte(connectionId)) - for i in 0 ..< chunkCount: - await stream.write(message) - # Random delay between writes (20-100ms) - await sleepAsync(rand(20 .. 100).milliseconds) - - echo "[DEBUG] client ", connectionId, " read" - var buffer: array[1, byte] - await stream.readExactly(addr buffer, 1) + try: + echo "[DEBUG] client ", connectionId, " create client" + let client = transportProvider() + let conn = await client.dial(server.addrs[0]) + echo "[DEBUG] client ", connectionId, " create muxer" + let muxer = streamProvider(conn) + + echo "[DEBUG] client ", connectionId, " create stream" + let stream = await muxer.newStream() - # Verify we got back our own connection ID - check buffer[0] == byte(connectionId) + # Write data in chunks with random delay + echo "[DEBUG] client ", connectionId, " write" + let message = newData(chunkSize, byte(connectionId)) + for i in 0 ..< chunkCount: + await stream.write(message) + # Random delay between writes (20-100ms) + await sleepAsync(rand(20 .. 100).milliseconds) - echo "[DEBUG] Client ", connectionId, " close stream" - await stream.close() - echo "[DEBUG] Client ", connectionId, " close muxer" - await muxer.close() - echo "[DEBUG] Client ", connectionId, " close conn" - await conn.close() - echo "[DEBUG] Client ", connectionId, " done" + echo "[DEBUG] client ", connectionId, " read" + var buffer: array[1, byte] + await stream.readExactly(addr buffer, 1) + + # Verify we got back our own connection ID + check buffer[0] == byte(connectionId) + + echo "[DEBUG] Client ", connectionId, " close stream" + await stream.close() + echo "[DEBUG] Client ", connectionId, " close muxer" + await muxer.close() + echo "[DEBUG] Client ", connectionId, " close conn" + await conn.close() + echo "[DEBUG] Client ", connectionId, " done" + except CatchableError as exc: + echo "[DEBUG] Client ", connectionId, " ERROR: ", exc.msg echo "[DEBUG] server create" let server = transportProvider() From ae77eb83e84950b7a87923cd0984229b10deef15 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Mon, 1 Dec 2025 16:23:40 -0300 Subject: [PATCH 6/6] emulate original head of line blocking behavior --- tests/libp2p/transports/stream_tests.nim | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/libp2p/transports/stream_tests.nim b/tests/libp2p/transports/stream_tests.nim index 4947594f9e..bac6a18cc0 100644 --- a/tests/libp2p/transports/stream_tests.nim +++ b/tests/libp2p/transports/stream_tests.nim @@ -562,6 +562,7 @@ template streamTransportTest*( var clientFuts: seq[Future[void]] for i in 0 ..< numConnections: clientFuts.add(runClient(server, i)) + await sleepAsync(300.millis) echo "[DEBUG] await allFutures(clientFuts)" await allFutures(clientFuts)