-
Notifications
You must be signed in to change notification settings - Fork 66
feat(wstransport): support concurrent accept in WsTransport #1919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -37,6 +37,7 @@ const | |||||||||||||||||
| DefaultHeadersTimeout = 3.seconds | ||||||||||||||||||
| DefaultAutotlsWaitTimeout = 3.seconds | ||||||||||||||||||
| DefaultAutotlsRetries = 3 | ||||||||||||||||||
| DefaultConcurrentAcceptsPerHttpServer = 100 | ||||||||||||||||||
|
|
||||||||||||||||||
| type | ||||||||||||||||||
| WsStream = ref object of Connection | ||||||||||||||||||
|
|
@@ -111,11 +112,18 @@ method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} = | |||||||||||||||||
| method getWrapped*(s: WsStream): Connection = | ||||||||||||||||||
| nil | ||||||||||||||||||
|
|
||||||||||||||||||
| type | ||||||||||||||||||
| AcceptResult = Result[Connection, ref CatchableError] | ||||||||||||||||||
| AcceptDispatcherFinishedError = object of CatchableError | ||||||||||||||||||
|
|
||||||||||||||||||
| type WsTransport* = ref object of Transport | ||||||||||||||||||
| httpservers: seq[HttpServer] | ||||||||||||||||||
| wsserver: WSServer | ||||||||||||||||||
| connections: array[Direction, seq[WsStream]] | ||||||||||||||||||
| acceptFuts: seq[Future[HttpRequest]] | ||||||||||||||||||
| acceptResults: AsyncQueue[AcceptResult] | ||||||||||||||||||
| acceptLoop: Future[void] | ||||||||||||||||||
| concurrentAcceptsPerHttpServer: int | ||||||||||||||||||
|
|
||||||||||||||||||
| tlsPrivateKey*: TLSPrivateKey | ||||||||||||||||||
| tlsCertificate*: TLSCertificate | ||||||||||||||||||
|
|
@@ -129,6 +137,123 @@ type WsTransport* = ref object of Transport | |||||||||||||||||
| proc secure*(self: WsTransport): bool = | ||||||||||||||||||
| not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate)) | ||||||||||||||||||
|
|
||||||||||||||||||
| proc connHandler( | ||||||||||||||||||
| self: WsTransport, stream: WSSession, secure: bool, dir: Direction | ||||||||||||||||||
| ): Future[Connection] {.async: (raises: [CatchableError]).} = | ||||||||||||||||||
| ## Returning CatchableError is fine because we later handle different exceptions. | ||||||||||||||||||
|
|
||||||||||||||||||
| let (observedAddr, localAddr) = | ||||||||||||||||||
| try: | ||||||||||||||||||
| let | ||||||||||||||||||
| codec = | ||||||||||||||||||
| if secure: | ||||||||||||||||||
| MultiAddress.init("/wss") | ||||||||||||||||||
| else: | ||||||||||||||||||
| MultiAddress.init("/ws") | ||||||||||||||||||
| remoteAddr = stream.stream.reader.tsource.remoteAddress | ||||||||||||||||||
| localAddr = stream.stream.reader.tsource.localAddress | ||||||||||||||||||
|
|
||||||||||||||||||
| ( | ||||||||||||||||||
| MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| MultiAddress.init(localAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| ) | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use specific error instead of |
||||||||||||||||||
| trace "Failed to create observedAddr or listenAddr", description = exc.msg | ||||||||||||||||||
| if not (isNil(stream) and stream.stream.reader.closed): | ||||||||||||||||||
| safeClose(stream) | ||||||||||||||||||
| raise exc | ||||||||||||||||||
|
|
||||||||||||||||||
| let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr)) | ||||||||||||||||||
|
|
||||||||||||||||||
| self.connections[dir].add(conn) | ||||||||||||||||||
| proc onClose() {.async: (raises: []).} = | ||||||||||||||||||
| await noCancel conn.session.stream.reader.join() | ||||||||||||||||||
| self.connections[dir].keepItIf(it != conn) | ||||||||||||||||||
| trace "Cleaned up client" | ||||||||||||||||||
|
|
||||||||||||||||||
| asyncSpawn onClose() | ||||||||||||||||||
| return conn | ||||||||||||||||||
|
|
||||||||||||||||||
| proc handshakeWorker( | ||||||||||||||||||
| self: WsTransport, finished: Future[HttpRequest], secure: bool | ||||||||||||||||||
| ) {.async: (raises: []).} = | ||||||||||||||||||
| try: | ||||||||||||||||||
| let req = await finished | ||||||||||||||||||
| try: | ||||||||||||||||||
| let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout) | ||||||||||||||||||
| let conn = await self.connHandler(wstransp, secure, Direction.In) | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(AcceptResult.ok(conn)) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| await noCancel req.stream.closeWait() | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this never happens? |
||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| await noCancel req.stream.closeWait() | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(AcceptResult.err(exc)) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| discard | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(AcceptResult.err(exc)) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| discard | ||||||||||||||||||
|
Comment on lines
+182
to
+199
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please improve this logic:
for example: |
||||||||||||||||||
|
|
||||||||||||||||||
| proc acceptDispatcher(self: WsTransport) {.async: (raises: []).} = | ||||||||||||||||||
| trace "Entering acceptDispatcher" | ||||||||||||||||||
|
|
||||||||||||||||||
| # Sequentially enqueue N accepts per HttpServer into acceptFuts so we can recover the | ||||||||||||||||||
| # index into httpservers by simply dividing the index of the completed future by N. | ||||||||||||||||||
| for server in self.httpservers: | ||||||||||||||||||
| for _ in 0 ..< self.concurrentAcceptsPerHttpServer: | ||||||||||||||||||
| self.acceptFuts.add(server.accept()) | ||||||||||||||||||
|
|
||||||||||||||||||
| # Either httpservers is empty, or concurrentAcceptsPerHttpServer is zero (which is a defect) | ||||||||||||||||||
| if self.acceptFuts.len == 0: | ||||||||||||||||||
| warn "WsTransport.acceptDispatcher has no work; exiting" | ||||||||||||||||||
| return | ||||||||||||||||||
|
|
||||||||||||||||||
| while self.running: | ||||||||||||||||||
| try: | ||||||||||||||||||
| var finished: Future[HttpRequest] | ||||||||||||||||||
| try: | ||||||||||||||||||
|
Comment on lines
+216
to
+218
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please avoid |
||||||||||||||||||
| finished = await one(self.acceptFuts) | ||||||||||||||||||
| except ValueError as exc: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport accept error: " & exc.msg, exc) | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||||||||||||||||||
|
|
||||||||||||||||||
| let futIndex = self.acceptFuts.find(finished) | ||||||||||||||||||
| if futIndex < 0: | ||||||||||||||||||
| continue | ||||||||||||||||||
| let serverIndex = futIndex div self.concurrentAcceptsPerHttpServer | ||||||||||||||||||
| if serverIndex >= self.httpservers.len: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport server index out of bounds") | ||||||||||||||||||
| let httpServer = self.httpservers[serverIndex] | ||||||||||||||||||
|
|
||||||||||||||||||
| # Replenish the completed accept() future for the same server | ||||||||||||||||||
| self.acceptFuts[futIndex] = httpServer.accept() | ||||||||||||||||||
|
|
||||||||||||||||||
| asyncSpawn self.handshakeWorker(finished, httpServer.secure) | ||||||||||||||||||
|
|
||||||||||||||||||
| await sleepAsync(0) # be nice | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this really necessary? would be interesting to here reasoning here |
||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| # Dispatcher should never exit before self.running is false | ||||||||||||||||||
| if not self.running: | ||||||||||||||||||
| break | ||||||||||||||||||
| # Unexpected error, so yield for a while before resuming dispatch | ||||||||||||||||||
| trace "Error in accept dispatcher", msg = exc.msg | ||||||||||||||||||
| try: | ||||||||||||||||||
| await sleepAsync(100.milliseconds) | ||||||||||||||||||
| except CancelledError: | ||||||||||||||||||
| discard | ||||||||||||||||||
| try: | ||||||||||||||||||
| # This error never leaves the results queue, guaranteeing popFirst never deadlocks | ||||||||||||||||||
| self.acceptResults.addLastNoWait( | ||||||||||||||||||
| AcceptResult.err(newException(AcceptDispatcherFinishedError, "Server is closed")) | ||||||||||||||||||
| ) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport accept results queue full") | ||||||||||||||||||
|
|
||||||||||||||||||
| trace "Exiting acceptDispatcher" | ||||||||||||||||||
|
|
||||||||||||||||||
| method start*( | ||||||||||||||||||
| self: WsTransport, addrs: seq[MultiAddress] | ||||||||||||||||||
| ) {.async: (raises: [LPError, transport.TransportError, CancelledError]).} = | ||||||||||||||||||
|
|
@@ -209,6 +334,9 @@ method start*( | |||||||||||||||||
|
|
||||||||||||||||||
| trace "Listening on", addresses = self.addrs | ||||||||||||||||||
|
|
||||||||||||||||||
| self.acceptResults = newAsyncQueue[AcceptResult]() | ||||||||||||||||||
| self.acceptLoop = self.acceptDispatcher() | ||||||||||||||||||
|
|
||||||||||||||||||
| method stop*(self: WsTransport) {.async: (raises: []).} = | ||||||||||||||||||
| ## stop the transport | ||||||||||||||||||
| ## | ||||||||||||||||||
|
|
@@ -224,6 +352,9 @@ method stop*(self: WsTransport) {.async: (raises: []).} = | |||||||||||||||||
| self.connections[Direction.Out].mapIt(it.close()) | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| if not isNil(self.acceptLoop): | ||||||||||||||||||
| await self.acceptLoop.cancelAndWait() | ||||||||||||||||||
|
|
||||||||||||||||||
| var toWait: seq[Future[void]] | ||||||||||||||||||
| for fut in self.acceptFuts: | ||||||||||||||||||
| if not fut.finished: | ||||||||||||||||||
|
|
@@ -242,43 +373,6 @@ method stop*(self: WsTransport) {.async: (raises: []).} = | |||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| trace "Error shutting down ws transport", description = exc.msg | ||||||||||||||||||
|
|
||||||||||||||||||
| proc connHandler( | ||||||||||||||||||
| self: WsTransport, stream: WSSession, secure: bool, dir: Direction | ||||||||||||||||||
| ): Future[Connection] {.async: (raises: [CatchableError]).} = | ||||||||||||||||||
| ## Returning CatchableError is fine because we later handle different exceptions. | ||||||||||||||||||
|
|
||||||||||||||||||
| let (observedAddr, localAddr) = | ||||||||||||||||||
| try: | ||||||||||||||||||
| let | ||||||||||||||||||
| codec = | ||||||||||||||||||
| if secure: | ||||||||||||||||||
| MultiAddress.init("/wss") | ||||||||||||||||||
| else: | ||||||||||||||||||
| MultiAddress.init("/ws") | ||||||||||||||||||
| remoteAddr = stream.stream.reader.tsource.remoteAddress | ||||||||||||||||||
| localAddr = stream.stream.reader.tsource.localAddress | ||||||||||||||||||
|
|
||||||||||||||||||
| ( | ||||||||||||||||||
| MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| MultiAddress.init(localAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| ) | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| trace "Failed to create observedAddr or listenAddr", description = exc.msg | ||||||||||||||||||
| if not (isNil(stream) and stream.stream.reader.closed): | ||||||||||||||||||
| safeClose(stream) | ||||||||||||||||||
| raise exc | ||||||||||||||||||
|
|
||||||||||||||||||
| let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr)) | ||||||||||||||||||
|
|
||||||||||||||||||
| self.connections[dir].add(conn) | ||||||||||||||||||
| proc onClose() {.async: (raises: []).} = | ||||||||||||||||||
| await noCancel conn.session.stream.reader.join() | ||||||||||||||||||
| self.connections[dir].keepItIf(it != conn) | ||||||||||||||||||
| trace "Cleaned up client" | ||||||||||||||||||
|
|
||||||||||||||||||
| asyncSpawn onClose() | ||||||||||||||||||
| return conn | ||||||||||||||||||
|
|
||||||||||||||||||
| method accept*( | ||||||||||||||||||
| self: WsTransport | ||||||||||||||||||
| ): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} = | ||||||||||||||||||
|
|
@@ -294,58 +388,44 @@ method accept*( | |||||||||||||||||
| if not self.running: | ||||||||||||||||||
| raise newTransportClosedError() | ||||||||||||||||||
|
|
||||||||||||||||||
| if self.acceptFuts.len <= 0: | ||||||||||||||||||
| self.acceptFuts = self.httpservers.mapIt(it.accept()) | ||||||||||||||||||
|
|
||||||||||||||||||
| if self.acceptFuts.len <= 0: | ||||||||||||||||||
| let res = await self.acceptResults.popFirst() | ||||||||||||||||||
|
|
||||||||||||||||||
| if res.isErr: | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm... i would still prefer to use same construct for handling error. that is using pseudocode: # early return if there is value
if not res.isErr:
return res.value
# handle errors
try:
raise res.error
except WebSocketError:
except:
except:
except:would this be possible? |
||||||||||||||||||
| let exc = res.error | ||||||||||||||||||
| if exc of WebSocketError: | ||||||||||||||||||
| debug "Websocket Error", description = exc.msg | ||||||||||||||||||
| elif exc of HttpError: | ||||||||||||||||||
| debug "Http Error", description = exc.msg | ||||||||||||||||||
| elif exc of AsyncStreamError: | ||||||||||||||||||
| debug "AsyncStream Error", description = exc.msg | ||||||||||||||||||
| elif exc of TransportTooManyError: | ||||||||||||||||||
| debug "Too many files opened", description = exc.msg | ||||||||||||||||||
| elif exc of TransportAbortedError: | ||||||||||||||||||
| debug "Connection aborted", description = exc.msg | ||||||||||||||||||
| elif exc of AsyncTimeoutError: | ||||||||||||||||||
| debug "Timed out", description = exc.msg | ||||||||||||||||||
| elif exc of TransportOsError: | ||||||||||||||||||
| debug "OS Error", description = exc.msg | ||||||||||||||||||
| elif exc of TransportUseClosedError: | ||||||||||||||||||
| debug "Server was closed", description = exc.msg | ||||||||||||||||||
| raise newTransportClosedError(exc) | ||||||||||||||||||
| elif exc of AcceptDispatcherFinishedError: | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(res) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport handshakeResults queue full") | ||||||||||||||||||
|
Comment on lines
+413
to
+416
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
if accepted please search and updated other places... reasoning: |
||||||||||||||||||
| debug "Server was closed", description = exc.msg | ||||||||||||||||||
| raise newTransportClosedError(exc) | ||||||||||||||||||
| elif exc of CancelledError: | ||||||||||||||||||
| raise (ref CancelledError)(exc) | ||||||||||||||||||
| else: | ||||||||||||||||||
| info "Unexpected error accepting connection", description = exc.msg | ||||||||||||||||||
| raise newException( | ||||||||||||||||||
| transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc | ||||||||||||||||||
| ) | ||||||||||||||||||
| return | ||||||||||||||||||
|
|
||||||||||||||||||
| let finished = | ||||||||||||||||||
| try: | ||||||||||||||||||
| await one(self.acceptFuts) | ||||||||||||||||||
| except ValueError: | ||||||||||||||||||
| raiseAssert("already checked with if") | ||||||||||||||||||
| except CancelledError as e: | ||||||||||||||||||
| raise e | ||||||||||||||||||
|
|
||||||||||||||||||
| let index = self.acceptFuts.find(finished) | ||||||||||||||||||
| self.acceptFuts[index] = self.httpservers[index].accept() | ||||||||||||||||||
|
|
||||||||||||||||||
| try: | ||||||||||||||||||
| let req = await finished | ||||||||||||||||||
|
|
||||||||||||||||||
| try: | ||||||||||||||||||
| let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout) | ||||||||||||||||||
| let isSecure = self.httpservers[index].secure | ||||||||||||||||||
|
|
||||||||||||||||||
| return await self.connHandler(wstransp, isSecure, Direction.In) | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| await noCancel req.stream.closeWait() | ||||||||||||||||||
| raise exc | ||||||||||||||||||
| except WebSocketError as exc: | ||||||||||||||||||
| debug "Websocket Error", description = exc.msg | ||||||||||||||||||
| except HttpError as exc: | ||||||||||||||||||
| debug "Http Error", description = exc.msg | ||||||||||||||||||
| except AsyncStreamError as exc: | ||||||||||||||||||
| debug "AsyncStream Error", description = exc.msg | ||||||||||||||||||
| except TransportTooManyError as exc: | ||||||||||||||||||
| debug "Too many files opened", description = exc.msg | ||||||||||||||||||
| except TransportAbortedError as exc: | ||||||||||||||||||
| debug "Connection aborted", description = exc.msg | ||||||||||||||||||
| except AsyncTimeoutError as exc: | ||||||||||||||||||
| debug "Timed out", description = exc.msg | ||||||||||||||||||
| except TransportUseClosedError as exc: | ||||||||||||||||||
| debug "Server was closed", description = exc.msg | ||||||||||||||||||
| raise newTransportClosedError(exc) | ||||||||||||||||||
| except CancelledError as exc: | ||||||||||||||||||
| raise exc | ||||||||||||||||||
| except TransportOsError as exc: | ||||||||||||||||||
| debug "OS Error", description = exc.msg | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| info "Unexpected error accepting connection", description = exc.msg | ||||||||||||||||||
| raise newException( | ||||||||||||||||||
| transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc | ||||||||||||||||||
| ) | ||||||||||||||||||
| return res.value | ||||||||||||||||||
|
|
||||||||||||||||||
| method dial*( | ||||||||||||||||||
| self: WsTransport, | ||||||||||||||||||
|
|
@@ -393,6 +473,7 @@ proc new*( | |||||||||||||||||
| factories: openArray[ExtFactory] = [], | ||||||||||||||||||
| rng: ref HmacDrbgContext = nil, | ||||||||||||||||||
| handshakeTimeout = DefaultHeadersTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer, | ||||||||||||||||||
| ): T {.raises: [].} = | ||||||||||||||||||
| ## Creates a secure WebSocket transport | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -406,6 +487,11 @@ proc new*( | |||||||||||||||||
| factories: @factories, | ||||||||||||||||||
| rng: rng, | ||||||||||||||||||
| handshakeTimeout: handshakeTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer: | ||||||||||||||||||
| if concurrentAcceptsPerHttpServer <= 0: | ||||||||||||||||||
| DefaultConcurrentAcceptsPerHttpServer | ||||||||||||||||||
| else: | ||||||||||||||||||
| concurrentAcceptsPerHttpServer, | ||||||||||||||||||
|
Comment on lines
+490
to
+494
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this logic is added there why is it not added to other |
||||||||||||||||||
| ) | ||||||||||||||||||
| procCall Transport(self).initialize() | ||||||||||||||||||
| self | ||||||||||||||||||
|
|
@@ -417,6 +503,7 @@ proc new*( | |||||||||||||||||
| factories: openArray[ExtFactory] = [], | ||||||||||||||||||
| rng: ref HmacDrbgContext = nil, | ||||||||||||||||||
| handshakeTimeout = DefaultHeadersTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer, | ||||||||||||||||||
| ): T {.raises: [].} = | ||||||||||||||||||
| ## Creates a clear-text WebSocket transport | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -429,4 +516,5 @@ proc new*( | |||||||||||||||||
| factories = @factories, | ||||||||||||||||||
| rng = rng, | ||||||||||||||||||
| handshakeTimeout = handshakeTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer = concurrentAcceptsPerHttpServer, | ||||||||||||||||||
| ) | ||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI on windows: for some reason fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will investigate. It's hanging non-deterministically in a
[Suite ] WebSocket transporttest:...that should take 2-3 seconds every time: