Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pinned
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ serialization;https://github.com/status-im/nim-serialization@#548d0adc9797a10b2d
stew;https://github.com/status-im/nim-stew@#b66168735d6f3841c5239c3169d3fe5fe98b1257
testutils;https://github.com/status-im/nim-testutils@#9e842bd58420d23044bc55e16088e8abbe93ce51
unittest2;https://github.com/status-im/nim-unittest2@#8b51e99b4a57fcfb31689230e75595f024543024
websock;https://github.com/status-im/nim-websock@#35ae76f1559e835c80f9c1a3943bf995d3dd9eb5
websock;https://github.com/status-im/nim-websock@#3986b3557714d9f3bbbc3854d08a3c404996a4d7
zlib;https://github.com/status-im/nim-zlib@#daa8723fd32299d4ca621c837430c29a5a11e19a
jwt;https://github.com/vacp2p/nim-jwt@#18f8378de52b241f321c1f9ea905456e89b95c6f
bearssl_pkey_decoder;https://github.com/vacp2p/bearssl_pkey_decoder@#21dd3710df9345ed2ad8bf8f882761e07863b8e0
Expand Down
290 changes: 199 additions & 91 deletions libp2p/transports/wstransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const
DefaultHeadersTimeout = 3.seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI on windows: for some reason fails

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will investigate. It's hanging non-deterministically in a [Suite ] WebSocket transport test:

  [Test   ]           server with multiple parallel connections
Terminate batch job (Y/N)? 
^C
Error: The operation was canceled.

...that should take 2-3 seconds every time:

  [Test   ]           server with multiple parallel connections
@[0, 1, 2, 3, 4, 4, 2, 0, 3, 1, 4, 3, 0, 1, 2, 4, 3, 0, 1, 2, 4, 1, 0, 3, 2, 1, 4, 0, 3, 2, 1, 0, 4, 3, 4, 1, 2, 0, 2, 1, 3, 4, 1, 2, 3, 0, 4, 1, 2, 0, 1, 3, 4, 2, 0, 4, 2, 0, 3, 1, 4, 1, 2, 3, 0, 2, 4, 0, 2, 1, 0, 3, 4, 2, 1, 4, 0, 2, 3, 1, 3, 2, 0, 4, 1, 2, 3, 0, 4, 2, 1, 3, 0, 1, 4, 2, 1, 4, 0, 3, 4, 0, 1, 2, 4, 3, 3, 0, 1, 2, 4, 2, 3, 0, 1, 2, 4, 3, 4, 0, 2, 3, 1, 2, 3, 4, 1, 0, 2, 3, 4, 1, 3, 4, 2, 0, 3, 1, 4, 0, 2, 2, 3, 1, 4, 0, 3, 0, 2, 4, 1, 3, 0, 1, 4, 3, 1, 3, 0, 0]
  [OK     ] (  2.62s) server with multiple parallel connections

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add 300ms of delay between client connection attempts it doesn't hang, specifically on Windows and on Nim 2.2.6. There is a nonzero chance this is a Chronos + Nim 2.2.6 + Windows bug or some combination thereof. It doesn't seem to hang on the Nim 2.0.16 + Windows CI. I'll investigate further.

Copy link
Member

@vladopajic vladopajic Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we run client, then wait it to finish (with 300m delay) we will serialize all client connections, which avoids purpose of test which is to run may client connections in parallel.


if you need to test via ci, you can run only ws tests in ci so you don't wait too much time for test to finish.
to do this, change test/test_all.nim and just import ws tests here:

import ./libp2p/transports/test_ws

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we run client, then wait it to finish (with 300m delay) we will serialize all client connections, which avoids purpose of test which is to run may client connections in parallel.

Yes, I was just abusing the CI to quickly test some theories on Windows.

if you need to test via ci, you can run only ws tests in ci so you don't wait too much time for test to run. to do this, change test/test_all.nim and just import ws tests :

import ./libp2p/transports/test_ws

Thanks!

If I need to do more Windows testing I'll set it up on a Windows machine. I didn't think it was something serious, but now I think it is not a quick fix.

Sorry for the noise. And there's a good chance that this nim-libp2p PR is dead, in any case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider testing status-im/nim-websock#180 - add test case that asserts concurrent accepts

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: similar test failed on macos-15-arm64 (Nim v2.2.6) "master" (technically not master but code just adds logs)

https://github.com/vacp2p/nim-libp2p/actions/runs/19856566195/job/56895878119?pr=1924

image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much! Yes, I think this patch/PR is too risky. I'll submit an alternative one that doesn't have this problem.

DefaultAutotlsWaitTimeout = 3.seconds
DefaultAutotlsRetries = 3
DefaultConcurrentAcceptsPerHttpServer = 100

type
WsStream = ref object of Connection
Expand Down Expand Up @@ -111,11 +112,18 @@ method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} =
method getWrapped*(s: WsStream): Connection =
nil

type
AcceptResult = Result[Connection, ref CatchableError]
AcceptDispatcherFinishedError = object of CatchableError

type WsTransport* = ref object of Transport
httpservers: seq[HttpServer]
wsserver: WSServer
connections: array[Direction, seq[WsStream]]
acceptFuts: seq[Future[HttpRequest]]
acceptResults: AsyncQueue[AcceptResult]
acceptLoop: Future[void]
concurrentAcceptsPerHttpServer: int

tlsPrivateKey*: TLSPrivateKey
tlsCertificate*: TLSCertificate
Expand All @@ -129,6 +137,123 @@ type WsTransport* = ref object of Transport
proc secure*(self: WsTransport): bool =
not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate))

proc connHandler(
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
): Future[Connection] {.async: (raises: [CatchableError]).} =
## Returning CatchableError is fine because we later handle different exceptions.

let (observedAddr, localAddr) =
try:
let
codec =
if secure:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
remoteAddr = stream.stream.reader.tsource.remoteAddress
localAddr = stream.stream.reader.tsource.localAddress

(
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(),
MultiAddress.init(localAddr).tryGet() & codec.tryGet(),
)
except CatchableError as exc:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use specific error instead of CatchableError.

trace "Failed to create observedAddr or listenAddr", description = exc.msg
if not (isNil(stream) and stream.stream.reader.closed):
safeClose(stream)
raise exc

let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr))

self.connections[dir].add(conn)
proc onClose() {.async: (raises: []).} =
await noCancel conn.session.stream.reader.join()
self.connections[dir].keepItIf(it != conn)
trace "Cleaned up client"

asyncSpawn onClose()
return conn

proc handshakeWorker(
self: WsTransport, finished: Future[HttpRequest], secure: bool
) {.async: (raises: []).} =
try:
let req = await finished
try:
let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout)
let conn = await self.connHandler(wstransp, secure, Direction.In)
try:
self.acceptResults.addLastNoWait(AcceptResult.ok(conn))
except AsyncQueueFullError:
await noCancel req.stream.closeWait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never happens?

except CatchableError as exc:
await noCancel req.stream.closeWait()
try:
self.acceptResults.addLastNoWait(AcceptResult.err(exc))
except AsyncQueueFullError:
discard
except CatchableError as exc:
try:
self.acceptResults.addLastNoWait(AcceptResult.err(exc))
except AsyncQueueFullError:
discard
Comment on lines +182 to +199
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please improve this logic:

  • use specific errors instead of CatchableError
  • do not use nasted error handling (try-except within try-except)

for example:

let wstransp = 
  try  
    await self.wsserver.handleRequest(req).wait(self.handshakeTimeout)
  except ....

 let conn = 
  try
     await self.connHandler(wstransp, secure, Direction.In)
  except ....
  


proc acceptDispatcher(self: WsTransport) {.async: (raises: []).} =
trace "Entering acceptDispatcher"

# Sequentially enqueue N accepts per HttpServer into acceptFuts so we can recover the
# index into httpservers by simply dividing the index of the completed future by N.
for server in self.httpservers:
for _ in 0 ..< self.concurrentAcceptsPerHttpServer:
self.acceptFuts.add(server.accept())

# Either httpservers is empty, or concurrentAcceptsPerHttpServer is zero (which is a defect)
if self.acceptFuts.len == 0:
warn "WsTransport.acceptDispatcher has no work; exiting"
return

while self.running:
try:
var finished: Future[HttpRequest]
try:
Comment on lines +216 to +218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid try within try

finished = await one(self.acceptFuts)
except ValueError as exc:
raise newException(AssertionDefect, "wstransport accept error: " & exc.msg, exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be raiseAssert


let futIndex = self.acceptFuts.find(finished)
if futIndex < 0:
continue
let serverIndex = futIndex div self.concurrentAcceptsPerHttpServer
if serverIndex >= self.httpservers.len:
raise newException(AssertionDefect, "wstransport server index out of bounds")
let httpServer = self.httpservers[serverIndex]

# Replenish the completed accept() future for the same server
self.acceptFuts[futIndex] = httpServer.accept()

asyncSpawn self.handshakeWorker(finished, httpServer.secure)

await sleepAsync(0) # be nice
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really necessary? would be interesting to here reasoning here

except CatchableError as exc:
# Dispatcher should never exit before self.running is false
if not self.running:
break
# Unexpected error, so yield for a while before resuming dispatch
trace "Error in accept dispatcher", msg = exc.msg
try:
await sleepAsync(100.milliseconds)
except CancelledError:
discard
try:
# This error never leaves the results queue, guaranteeing popFirst never deadlocks
self.acceptResults.addLastNoWait(
AcceptResult.err(newException(AcceptDispatcherFinishedError, "Server is closed"))
)
except AsyncQueueFullError:
raise newException(AssertionDefect, "wstransport accept results queue full")

trace "Exiting acceptDispatcher"

method start*(
self: WsTransport, addrs: seq[MultiAddress]
) {.async: (raises: [LPError, transport.TransportError, CancelledError]).} =
Expand Down Expand Up @@ -209,76 +334,65 @@ method start*(

trace "Listening on", addresses = self.addrs

self.acceptResults = newAsyncQueue[AcceptResult]()
self.acceptLoop = self.acceptDispatcher()

method stop*(self: WsTransport) {.async: (raises: []).} =
## stop the transport
##

echo "[DEBUG WSTRANSPORT] WsTransport.stop"
self.running = false # mark stopped as soon as possible

try:
trace "Stopping WS transport"
echo "[DEBUG WSTRANSPORT] stop transport"
await procCall Transport(self).stop() # call base

echo "[DEBUG WSTRANSPORT] allfinished connections"
discard await allFinished(
self.connections[Direction.In].mapIt(it.close()) &
self.connections[Direction.Out].mapIt(it.close())
)

if not isNil(self.acceptLoop):
echo "[DEBUG WSTRANSPORT] stop dispatcher"
await self.acceptLoop.cancelAndWait()
echo "[DEBUG WSTRANSPORT] stopped dispatcher"

for i, server in self.httpservers:
echo "[DEBUG WSTRANSPORT] stop server ", i
server.stop()
echo "[DEBUG WSTRANSPORT] stopped server ", i

var toWait: seq[Future[void]]
for fut in self.acceptFuts:
echo "[DEBUG WSTRANSPORT] self acceptfuts loop count=", self.acceptFuts.len
for i, fut in self.acceptFuts:
if not fut.finished:
if i <= 3:
echo "[DEBUG WSTRANSPORT] toWait add cancelAndWait ", i
toWait.add(fut.cancelAndWait())
if i <= 3:
echo "[DEBUG WSTRANSPORT] toWait add cancelAndWait done ", i
elif fut.completed:
echo "[DEBUG WSTRANSPORT] toWait add stream closeWait ", i
toWait.add(fut.read().stream.closeWait())
echo "[DEBUG WSTRANSPORT] toWait add stream closeWait done ", i

for server in self.httpservers:
server.stop()
for i, server in self.httpservers:
echo "[DEBUG WSTRANSPORT] server closewait ", i
toWait.add(server.closeWait())
echo "[DEBUG WSTRANSPORT] server closewait done ", i

echo "[DEBUG WSTRANSPORT] await allFutures"
await allFutures(toWait)
echo "[DEBUG WSTRANSPORT] await allFutures done"

self.httpservers = @[]
trace "Transport stopped"
echo "[DEBUG WSTRANSPORT] wstransport stopped"
except CatchableError as exc:
trace "Error shutting down ws transport", description = exc.msg

proc connHandler(
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
): Future[Connection] {.async: (raises: [CatchableError]).} =
## Returning CatchableError is fine because we later handle different exceptions.

let (observedAddr, localAddr) =
try:
let
codec =
if secure:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
remoteAddr = stream.stream.reader.tsource.remoteAddress
localAddr = stream.stream.reader.tsource.localAddress

(
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(),
MultiAddress.init(localAddr).tryGet() & codec.tryGet(),
)
except CatchableError as exc:
trace "Failed to create observedAddr or listenAddr", description = exc.msg
if not (isNil(stream) and stream.stream.reader.closed):
safeClose(stream)
raise exc

let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr))

self.connections[dir].add(conn)
proc onClose() {.async: (raises: []).} =
await noCancel conn.session.stream.reader.join()
self.connections[dir].keepItIf(it != conn)
trace "Cleaned up client"

asyncSpawn onClose()
return conn

method accept*(
self: WsTransport
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
Expand All @@ -294,58 +408,44 @@ method accept*(
if not self.running:
raise newTransportClosedError()

if self.acceptFuts.len <= 0:
self.acceptFuts = self.httpservers.mapIt(it.accept())

if self.acceptFuts.len <= 0:
let res = await self.acceptResults.popFirst()

if res.isErr:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm... i would still prefer to use same construct for handling error. that is using try-expect instead of if-of. i can understand why code was made like this, but i would prefer for all error handling across codebase to be try-expect instead of if .. of ...

pseudocode:

# early return if there is value 
if not res.isErr: 
 return res.value 
 
 # handle errors
 
 try:
   raise res.error
 except WebSocketError:
 except:
 except:
 except:

would this be possible?

let exc = res.error
if exc of WebSocketError:
debug "Websocket Error", description = exc.msg
elif exc of HttpError:
debug "Http Error", description = exc.msg
elif exc of AsyncStreamError:
debug "AsyncStream Error", description = exc.msg
elif exc of TransportTooManyError:
debug "Too many files opened", description = exc.msg
elif exc of TransportAbortedError:
debug "Connection aborted", description = exc.msg
elif exc of AsyncTimeoutError:
debug "Timed out", description = exc.msg
elif exc of TransportOsError:
debug "OS Error", description = exc.msg
elif exc of TransportUseClosedError:
debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
elif exc of AcceptDispatcherFinishedError:
try:
self.acceptResults.addLastNoWait(res)
except AsyncQueueFullError:
raise newException(AssertionDefect, "wstransport handshakeResults queue full")
Comment on lines +433 to +436
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try:
self.acceptResults.addLastNoWait(res)
except AsyncQueueFullError:
raise newException(AssertionDefect, "wstransport handshakeResults queue full")
try:
self.acceptResults.addLastNoWait(res)
except AsyncQueueFullError:
raiseAssert "acceptResults queue is full"

if accepted please search and updated other places...

reasoning: raiseAssert also raises AssertionDefect. libp2p across codebase mainly uses raiseAsserts

debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
elif exc of CancelledError:
raise (ref CancelledError)(exc)
else:
info "Unexpected error accepting connection", description = exc.msg
raise newException(
transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc
)
return

let finished =
try:
await one(self.acceptFuts)
except ValueError:
raiseAssert("already checked with if")
except CancelledError as e:
raise e

let index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.httpservers[index].accept()

try:
let req = await finished

try:
let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout)
let isSecure = self.httpservers[index].secure

return await self.connHandler(wstransp, isSecure, Direction.In)
except CatchableError as exc:
await noCancel req.stream.closeWait()
raise exc
except WebSocketError as exc:
debug "Websocket Error", description = exc.msg
except HttpError as exc:
debug "Http Error", description = exc.msg
except AsyncStreamError as exc:
debug "AsyncStream Error", description = exc.msg
except TransportTooManyError as exc:
debug "Too many files opened", description = exc.msg
except TransportAbortedError as exc:
debug "Connection aborted", description = exc.msg
except AsyncTimeoutError as exc:
debug "Timed out", description = exc.msg
except TransportUseClosedError as exc:
debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
except CancelledError as exc:
raise exc
except TransportOsError as exc:
debug "OS Error", description = exc.msg
except CatchableError as exc:
info "Unexpected error accepting connection", description = exc.msg
raise newException(
transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc
)
return res.value

method dial*(
self: WsTransport,
Expand Down Expand Up @@ -393,6 +493,7 @@ proc new*(
factories: openArray[ExtFactory] = [],
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout,
concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer,
): T {.raises: [].} =
## Creates a secure WebSocket transport

Expand All @@ -406,6 +507,11 @@ proc new*(
factories: @factories,
rng: rng,
handshakeTimeout: handshakeTimeout,
concurrentAcceptsPerHttpServer:
if concurrentAcceptsPerHttpServer <= 0:
DefaultConcurrentAcceptsPerHttpServer
else:
concurrentAcceptsPerHttpServer,
Comment on lines +510 to +514
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this logic is added there why is it not added to other new proc?
my main concern is if this logic, for fallback value, is really needed then it should be in other proc as well.

)
procCall Transport(self).initialize()
self
Expand All @@ -417,6 +523,7 @@ proc new*(
factories: openArray[ExtFactory] = [],
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout,
concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer,
): T {.raises: [].} =
## Creates a clear-text WebSocket transport

Expand All @@ -429,4 +536,5 @@ proc new*(
factories = @factories,
rng = rng,
handshakeTimeout = handshakeTimeout,
concurrentAcceptsPerHttpServer = concurrentAcceptsPerHttpServer,
)
Loading
Loading