Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 175 additions & 87 deletions libp2p/transports/wstransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const
DefaultHeadersTimeout = 3.seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI on windows: for some reason fails

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will investigate. It's hanging non-deterministically in a [Suite ] WebSocket transport test:

  [Test   ]           server with multiple parallel connections
Terminate batch job (Y/N)? 
^C
Error: The operation was canceled.

...that should take 2-3 seconds every time:

  [Test   ]           server with multiple parallel connections
@[0, 1, 2, 3, 4, 4, 2, 0, 3, 1, 4, 3, 0, 1, 2, 4, 3, 0, 1, 2, 4, 1, 0, 3, 2, 1, 4, 0, 3, 2, 1, 0, 4, 3, 4, 1, 2, 0, 2, 1, 3, 4, 1, 2, 3, 0, 4, 1, 2, 0, 1, 3, 4, 2, 0, 4, 2, 0, 3, 1, 4, 1, 2, 3, 0, 2, 4, 0, 2, 1, 0, 3, 4, 2, 1, 4, 0, 2, 3, 1, 3, 2, 0, 4, 1, 2, 3, 0, 4, 2, 1, 3, 0, 1, 4, 2, 1, 4, 0, 3, 4, 0, 1, 2, 4, 3, 3, 0, 1, 2, 4, 2, 3, 0, 1, 2, 4, 3, 4, 0, 2, 3, 1, 2, 3, 4, 1, 0, 2, 3, 4, 1, 3, 4, 2, 0, 3, 1, 4, 0, 2, 2, 3, 1, 4, 0, 3, 0, 2, 4, 1, 3, 0, 1, 4, 3, 1, 3, 0, 0]
  [OK     ] (  2.62s) server with multiple parallel connections

DefaultAutotlsWaitTimeout = 3.seconds
DefaultAutotlsRetries = 3
DefaultConcurrentAcceptsPerHttpServer = 100

type
WsStream = ref object of Connection
Expand Down Expand Up @@ -111,11 +112,18 @@ method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} =
method getWrapped*(s: WsStream): Connection =
nil

type
AcceptResult = Result[Connection, ref CatchableError]
AcceptDispatcherFinishedError = object of CatchableError

type WsTransport* = ref object of Transport
httpservers: seq[HttpServer]
wsserver: WSServer
connections: array[Direction, seq[WsStream]]
acceptFuts: seq[Future[HttpRequest]]
acceptResults: AsyncQueue[AcceptResult]
acceptLoop: Future[void]
concurrentAcceptsPerHttpServer: int

tlsPrivateKey*: TLSPrivateKey
tlsCertificate*: TLSCertificate
Expand All @@ -129,6 +137,123 @@ type WsTransport* = ref object of Transport
proc secure*(self: WsTransport): bool =
not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate))

proc connHandler(
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
): Future[Connection] {.async: (raises: [CatchableError]).} =
## Returning CatchableError is fine because we later handle different exceptions.

let (observedAddr, localAddr) =
try:
let
codec =
if secure:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
remoteAddr = stream.stream.reader.tsource.remoteAddress
localAddr = stream.stream.reader.tsource.localAddress

(
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(),
MultiAddress.init(localAddr).tryGet() & codec.tryGet(),
)
except CatchableError as exc:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use specific error instead of CatchableError.

trace "Failed to create observedAddr or listenAddr", description = exc.msg
if not (isNil(stream) and stream.stream.reader.closed):
safeClose(stream)
raise exc

let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr))

self.connections[dir].add(conn)
proc onClose() {.async: (raises: []).} =
await noCancel conn.session.stream.reader.join()
self.connections[dir].keepItIf(it != conn)
trace "Cleaned up client"

asyncSpawn onClose()
return conn

proc handshakeWorker(
self: WsTransport, finished: Future[HttpRequest], secure: bool
) {.async: (raises: []).} =
try:
let req = await finished
try:
let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout)
let conn = await self.connHandler(wstransp, secure, Direction.In)
try:
self.acceptResults.addLastNoWait(AcceptResult.ok(conn))
except AsyncQueueFullError:
await noCancel req.stream.closeWait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never happens?

except CatchableError as exc:
await noCancel req.stream.closeWait()
try:
self.acceptResults.addLastNoWait(AcceptResult.err(exc))
except AsyncQueueFullError:
discard
except CatchableError as exc:
try:
self.acceptResults.addLastNoWait(AcceptResult.err(exc))
except AsyncQueueFullError:
discard
Comment on lines +182 to +199
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please improve this logic:

  • use specific errors instead of CatchableError
  • do not use nasted error handling (try-except within try-except)

for example:

let wstransp = 
  try  
    await self.wsserver.handleRequest(req).wait(self.handshakeTimeout)
  except ....

 let conn = 
  try
     await self.connHandler(wstransp, secure, Direction.In)
  except ....
  


proc acceptDispatcher(self: WsTransport) {.async: (raises: []).} =
trace "Entering acceptDispatcher"

# Sequentially enqueue N accepts per HttpServer into acceptFuts so we can recover the
# index into httpservers by simply dividing the index of the completed future by N.
for server in self.httpservers:
for _ in 0 ..< self.concurrentAcceptsPerHttpServer:
self.acceptFuts.add(server.accept())

# Either httpservers is empty, or concurrentAcceptsPerHttpServer is zero (which is a defect)
if self.acceptFuts.len == 0:
warn "WsTransport.acceptDispatcher has no work; exiting"
return

while self.running:
try:
var finished: Future[HttpRequest]
try:
Comment on lines +216 to +218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid try within try

finished = await one(self.acceptFuts)
except ValueError as exc:
raise newException(AssertionDefect, "wstransport accept error: " & exc.msg, exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be raiseAssert


let futIndex = self.acceptFuts.find(finished)
if futIndex < 0:
continue
let serverIndex = futIndex div self.concurrentAcceptsPerHttpServer
if serverIndex >= self.httpservers.len:
raise newException(AssertionDefect, "wstransport server index out of bounds")
let httpServer = self.httpservers[serverIndex]

# Replenish the completed accept() future for the same server
self.acceptFuts[futIndex] = httpServer.accept()

asyncSpawn self.handshakeWorker(finished, httpServer.secure)

await sleepAsync(0) # be nice
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really necessary? would be interesting to here reasoning here

except CatchableError as exc:
# Dispatcher should never exit before self.running is false
if not self.running:
break
# Unexpected error, so yield for a while before resuming dispatch
trace "Error in accept dispatcher", msg = exc.msg
try:
await sleepAsync(100.milliseconds)
except CancelledError:
discard
try:
# This error never leaves the results queue, guaranteeing popFirst never deadlocks
self.acceptResults.addLastNoWait(
AcceptResult.err(newException(AcceptDispatcherFinishedError, "Server is closed"))
)
except AsyncQueueFullError:
raise newException(AssertionDefect, "wstransport accept results queue full")

trace "Exiting acceptDispatcher"

method start*(
self: WsTransport, addrs: seq[MultiAddress]
) {.async: (raises: [LPError, transport.TransportError, CancelledError]).} =
Expand Down Expand Up @@ -209,6 +334,9 @@ method start*(

trace "Listening on", addresses = self.addrs

self.acceptResults = newAsyncQueue[AcceptResult]()
self.acceptLoop = self.acceptDispatcher()

method stop*(self: WsTransport) {.async: (raises: []).} =
## stop the transport
##
Expand All @@ -224,6 +352,9 @@ method stop*(self: WsTransport) {.async: (raises: []).} =
self.connections[Direction.Out].mapIt(it.close())
)

if not isNil(self.acceptLoop):
await self.acceptLoop.cancelAndWait()

var toWait: seq[Future[void]]
for fut in self.acceptFuts:
if not fut.finished:
Expand All @@ -242,43 +373,6 @@ method stop*(self: WsTransport) {.async: (raises: []).} =
except CatchableError as exc:
trace "Error shutting down ws transport", description = exc.msg

proc connHandler(
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
): Future[Connection] {.async: (raises: [CatchableError]).} =
## Returning CatchableError is fine because we later handle different exceptions.

let (observedAddr, localAddr) =
try:
let
codec =
if secure:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
remoteAddr = stream.stream.reader.tsource.remoteAddress
localAddr = stream.stream.reader.tsource.localAddress

(
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(),
MultiAddress.init(localAddr).tryGet() & codec.tryGet(),
)
except CatchableError as exc:
trace "Failed to create observedAddr or listenAddr", description = exc.msg
if not (isNil(stream) and stream.stream.reader.closed):
safeClose(stream)
raise exc

let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr))

self.connections[dir].add(conn)
proc onClose() {.async: (raises: []).} =
await noCancel conn.session.stream.reader.join()
self.connections[dir].keepItIf(it != conn)
trace "Cleaned up client"

asyncSpawn onClose()
return conn

method accept*(
self: WsTransport
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
Expand All @@ -294,58 +388,44 @@ method accept*(
if not self.running:
raise newTransportClosedError()

if self.acceptFuts.len <= 0:
self.acceptFuts = self.httpservers.mapIt(it.accept())

if self.acceptFuts.len <= 0:
let res = await self.acceptResults.popFirst()

if res.isErr:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm... i would still prefer to use same construct for handling error. that is using try-expect instead of if-of. i can understand why code was made like this, but i would prefer for all error handling across codebase to be try-expect instead of if .. of ...

pseudocode:

# early return if there is value 
if not res.isErr: 
 return res.value 
 
 # handle errors
 
 try:
   raise res.error
 except WebSocketError:
 except:
 except:
 except:

would this be possible?

let exc = res.error
if exc of WebSocketError:
debug "Websocket Error", description = exc.msg
elif exc of HttpError:
debug "Http Error", description = exc.msg
elif exc of AsyncStreamError:
debug "AsyncStream Error", description = exc.msg
elif exc of TransportTooManyError:
debug "Too many files opened", description = exc.msg
elif exc of TransportAbortedError:
debug "Connection aborted", description = exc.msg
elif exc of AsyncTimeoutError:
debug "Timed out", description = exc.msg
elif exc of TransportOsError:
debug "OS Error", description = exc.msg
elif exc of TransportUseClosedError:
debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
elif exc of AcceptDispatcherFinishedError:
try:
self.acceptResults.addLastNoWait(res)
except AsyncQueueFullError:
raise newException(AssertionDefect, "wstransport handshakeResults queue full")
Comment on lines +413 to +416
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try:
self.acceptResults.addLastNoWait(res)
except AsyncQueueFullError:
raise newException(AssertionDefect, "wstransport handshakeResults queue full")
try:
self.acceptResults.addLastNoWait(res)
except AsyncQueueFullError:
raiseAssert "acceptResults queue is full"

if accepted please search and updated other places...

reasoning: raiseAssert also raises AssertionDefect. libp2p across codebase mainly uses raiseAsserts

debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
elif exc of CancelledError:
raise (ref CancelledError)(exc)
else:
info "Unexpected error accepting connection", description = exc.msg
raise newException(
transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc
)
return

let finished =
try:
await one(self.acceptFuts)
except ValueError:
raiseAssert("already checked with if")
except CancelledError as e:
raise e

let index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.httpservers[index].accept()

try:
let req = await finished

try:
let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout)
let isSecure = self.httpservers[index].secure

return await self.connHandler(wstransp, isSecure, Direction.In)
except CatchableError as exc:
await noCancel req.stream.closeWait()
raise exc
except WebSocketError as exc:
debug "Websocket Error", description = exc.msg
except HttpError as exc:
debug "Http Error", description = exc.msg
except AsyncStreamError as exc:
debug "AsyncStream Error", description = exc.msg
except TransportTooManyError as exc:
debug "Too many files opened", description = exc.msg
except TransportAbortedError as exc:
debug "Connection aborted", description = exc.msg
except AsyncTimeoutError as exc:
debug "Timed out", description = exc.msg
except TransportUseClosedError as exc:
debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
except CancelledError as exc:
raise exc
except TransportOsError as exc:
debug "OS Error", description = exc.msg
except CatchableError as exc:
info "Unexpected error accepting connection", description = exc.msg
raise newException(
transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc
)
return res.value

method dial*(
self: WsTransport,
Expand Down Expand Up @@ -393,6 +473,7 @@ proc new*(
factories: openArray[ExtFactory] = [],
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout,
concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer,
): T {.raises: [].} =
## Creates a secure WebSocket transport

Expand All @@ -406,6 +487,11 @@ proc new*(
factories: @factories,
rng: rng,
handshakeTimeout: handshakeTimeout,
concurrentAcceptsPerHttpServer:
if concurrentAcceptsPerHttpServer <= 0:
DefaultConcurrentAcceptsPerHttpServer
else:
concurrentAcceptsPerHttpServer,
Comment on lines +490 to +494
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this logic is added there why is it not added to other new proc?
my main concern is if this logic, for fallback value, is really needed then it should be in other proc as well.

)
procCall Transport(self).initialize()
self
Expand All @@ -417,6 +503,7 @@ proc new*(
factories: openArray[ExtFactory] = [],
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout,
concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer,
): T {.raises: [].} =
## Creates a clear-text WebSocket transport

Expand All @@ -429,4 +516,5 @@ proc new*(
factories = @factories,
rng = rng,
handshakeTimeout = handshakeTimeout,
concurrentAcceptsPerHttpServer = concurrentAcceptsPerHttpServer,
)
Loading