-
Notifications
You must be signed in to change notification settings - Fork 66
feat(wstransport): support concurrent accept #1919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5ffd667
1e4564f
a57f62b
5ee0487
62d56b6
ae77eb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -37,6 +37,7 @@ const | |||||||||||||||||
| DefaultHeadersTimeout = 3.seconds | ||||||||||||||||||
| DefaultAutotlsWaitTimeout = 3.seconds | ||||||||||||||||||
| DefaultAutotlsRetries = 3 | ||||||||||||||||||
| DefaultConcurrentAcceptsPerHttpServer = 100 | ||||||||||||||||||
|
|
||||||||||||||||||
| type | ||||||||||||||||||
| WsStream = ref object of Connection | ||||||||||||||||||
|
|
@@ -111,11 +112,18 @@ method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} = | |||||||||||||||||
| method getWrapped*(s: WsStream): Connection = | ||||||||||||||||||
| nil | ||||||||||||||||||
|
|
||||||||||||||||||
| type | ||||||||||||||||||
| AcceptResult = Result[Connection, ref CatchableError] | ||||||||||||||||||
| AcceptDispatcherFinishedError = object of CatchableError | ||||||||||||||||||
|
|
||||||||||||||||||
| type WsTransport* = ref object of Transport | ||||||||||||||||||
| httpservers: seq[HttpServer] | ||||||||||||||||||
| wsserver: WSServer | ||||||||||||||||||
| connections: array[Direction, seq[WsStream]] | ||||||||||||||||||
| acceptFuts: seq[Future[HttpRequest]] | ||||||||||||||||||
| acceptResults: AsyncQueue[AcceptResult] | ||||||||||||||||||
| acceptLoop: Future[void] | ||||||||||||||||||
| concurrentAcceptsPerHttpServer: int | ||||||||||||||||||
|
|
||||||||||||||||||
| tlsPrivateKey*: TLSPrivateKey | ||||||||||||||||||
| tlsCertificate*: TLSCertificate | ||||||||||||||||||
|
|
@@ -129,6 +137,123 @@ type WsTransport* = ref object of Transport | |||||||||||||||||
| proc secure*(self: WsTransport): bool = | ||||||||||||||||||
| not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate)) | ||||||||||||||||||
|
|
||||||||||||||||||
| proc connHandler( | ||||||||||||||||||
| self: WsTransport, stream: WSSession, secure: bool, dir: Direction | ||||||||||||||||||
| ): Future[Connection] {.async: (raises: [CatchableError]).} = | ||||||||||||||||||
| ## Returning CatchableError is fine because we later handle different exceptions. | ||||||||||||||||||
|
|
||||||||||||||||||
| let (observedAddr, localAddr) = | ||||||||||||||||||
| try: | ||||||||||||||||||
| let | ||||||||||||||||||
| codec = | ||||||||||||||||||
| if secure: | ||||||||||||||||||
| MultiAddress.init("/wss") | ||||||||||||||||||
| else: | ||||||||||||||||||
| MultiAddress.init("/ws") | ||||||||||||||||||
| remoteAddr = stream.stream.reader.tsource.remoteAddress | ||||||||||||||||||
| localAddr = stream.stream.reader.tsource.localAddress | ||||||||||||||||||
|
|
||||||||||||||||||
| ( | ||||||||||||||||||
| MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| MultiAddress.init(localAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| ) | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use specific error instead of |
||||||||||||||||||
| trace "Failed to create observedAddr or listenAddr", description = exc.msg | ||||||||||||||||||
| if not (isNil(stream) and stream.stream.reader.closed): | ||||||||||||||||||
| safeClose(stream) | ||||||||||||||||||
| raise exc | ||||||||||||||||||
|
|
||||||||||||||||||
| let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr)) | ||||||||||||||||||
|
|
||||||||||||||||||
| self.connections[dir].add(conn) | ||||||||||||||||||
| proc onClose() {.async: (raises: []).} = | ||||||||||||||||||
| await noCancel conn.session.stream.reader.join() | ||||||||||||||||||
| self.connections[dir].keepItIf(it != conn) | ||||||||||||||||||
| trace "Cleaned up client" | ||||||||||||||||||
|
|
||||||||||||||||||
| asyncSpawn onClose() | ||||||||||||||||||
| return conn | ||||||||||||||||||
|
|
||||||||||||||||||
| proc handshakeWorker( | ||||||||||||||||||
| self: WsTransport, finished: Future[HttpRequest], secure: bool | ||||||||||||||||||
| ) {.async: (raises: []).} = | ||||||||||||||||||
| try: | ||||||||||||||||||
| let req = await finished | ||||||||||||||||||
| try: | ||||||||||||||||||
| let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout) | ||||||||||||||||||
| let conn = await self.connHandler(wstransp, secure, Direction.In) | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(AcceptResult.ok(conn)) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| await noCancel req.stream.closeWait() | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this never happens? |
||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| await noCancel req.stream.closeWait() | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(AcceptResult.err(exc)) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| discard | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(AcceptResult.err(exc)) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| discard | ||||||||||||||||||
|
Comment on lines
+182
to
+199
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please improve this logic:
for example: |
||||||||||||||||||
|
|
||||||||||||||||||
| proc acceptDispatcher(self: WsTransport) {.async: (raises: []).} = | ||||||||||||||||||
| trace "Entering acceptDispatcher" | ||||||||||||||||||
|
|
||||||||||||||||||
| # Sequentially enqueue N accepts per HttpServer into acceptFuts so we can recover the | ||||||||||||||||||
| # index into httpservers by simply dividing the index of the completed future by N. | ||||||||||||||||||
| for server in self.httpservers: | ||||||||||||||||||
| for _ in 0 ..< self.concurrentAcceptsPerHttpServer: | ||||||||||||||||||
| self.acceptFuts.add(server.accept()) | ||||||||||||||||||
|
|
||||||||||||||||||
| # Either httpservers is empty, or concurrentAcceptsPerHttpServer is zero (which is a defect) | ||||||||||||||||||
| if self.acceptFuts.len == 0: | ||||||||||||||||||
| warn "WsTransport.acceptDispatcher has no work; exiting" | ||||||||||||||||||
| return | ||||||||||||||||||
|
|
||||||||||||||||||
| while self.running: | ||||||||||||||||||
| try: | ||||||||||||||||||
| var finished: Future[HttpRequest] | ||||||||||||||||||
| try: | ||||||||||||||||||
|
Comment on lines
+216
to
+218
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please avoid |
||||||||||||||||||
| finished = await one(self.acceptFuts) | ||||||||||||||||||
| except ValueError as exc: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport accept error: " & exc.msg, exc) | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||||||||||||||||||
|
|
||||||||||||||||||
| let futIndex = self.acceptFuts.find(finished) | ||||||||||||||||||
| if futIndex < 0: | ||||||||||||||||||
| continue | ||||||||||||||||||
| let serverIndex = futIndex div self.concurrentAcceptsPerHttpServer | ||||||||||||||||||
| if serverIndex >= self.httpservers.len: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport server index out of bounds") | ||||||||||||||||||
| let httpServer = self.httpservers[serverIndex] | ||||||||||||||||||
|
|
||||||||||||||||||
| # Replenish the completed accept() future for the same server | ||||||||||||||||||
| self.acceptFuts[futIndex] = httpServer.accept() | ||||||||||||||||||
|
|
||||||||||||||||||
| asyncSpawn self.handshakeWorker(finished, httpServer.secure) | ||||||||||||||||||
|
|
||||||||||||||||||
| await sleepAsync(0) # be nice | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this really necessary? would be interesting to here reasoning here |
||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| # Dispatcher should never exit before self.running is false | ||||||||||||||||||
| if not self.running: | ||||||||||||||||||
| break | ||||||||||||||||||
| # Unexpected error, so yield for a while before resuming dispatch | ||||||||||||||||||
| trace "Error in accept dispatcher", msg = exc.msg | ||||||||||||||||||
| try: | ||||||||||||||||||
| await sleepAsync(100.milliseconds) | ||||||||||||||||||
| except CancelledError: | ||||||||||||||||||
| discard | ||||||||||||||||||
| try: | ||||||||||||||||||
| # This error never leaves the results queue, guaranteeing popFirst never deadlocks | ||||||||||||||||||
| self.acceptResults.addLastNoWait( | ||||||||||||||||||
| AcceptResult.err(newException(AcceptDispatcherFinishedError, "Server is closed")) | ||||||||||||||||||
| ) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport accept results queue full") | ||||||||||||||||||
|
|
||||||||||||||||||
| trace "Exiting acceptDispatcher" | ||||||||||||||||||
|
|
||||||||||||||||||
| method start*( | ||||||||||||||||||
| self: WsTransport, addrs: seq[MultiAddress] | ||||||||||||||||||
| ) {.async: (raises: [LPError, transport.TransportError, CancelledError]).} = | ||||||||||||||||||
|
|
@@ -209,76 +334,65 @@ method start*( | |||||||||||||||||
|
|
||||||||||||||||||
| trace "Listening on", addresses = self.addrs | ||||||||||||||||||
|
|
||||||||||||||||||
| self.acceptResults = newAsyncQueue[AcceptResult]() | ||||||||||||||||||
| self.acceptLoop = self.acceptDispatcher() | ||||||||||||||||||
|
|
||||||||||||||||||
| method stop*(self: WsTransport) {.async: (raises: []).} = | ||||||||||||||||||
| ## stop the transport | ||||||||||||||||||
| ## | ||||||||||||||||||
|
|
||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] WsTransport.stop" | ||||||||||||||||||
| self.running = false # mark stopped as soon as possible | ||||||||||||||||||
|
|
||||||||||||||||||
| try: | ||||||||||||||||||
| trace "Stopping WS transport" | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] stop transport" | ||||||||||||||||||
| await procCall Transport(self).stop() # call base | ||||||||||||||||||
|
|
||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] allfinished connections" | ||||||||||||||||||
| discard await allFinished( | ||||||||||||||||||
| self.connections[Direction.In].mapIt(it.close()) & | ||||||||||||||||||
| self.connections[Direction.Out].mapIt(it.close()) | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| if not isNil(self.acceptLoop): | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] stop dispatcher" | ||||||||||||||||||
| await self.acceptLoop.cancelAndWait() | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] stopped dispatcher" | ||||||||||||||||||
|
|
||||||||||||||||||
| for i, server in self.httpservers: | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] stop server ", i | ||||||||||||||||||
| server.stop() | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] stopped server ", i | ||||||||||||||||||
|
|
||||||||||||||||||
| var toWait: seq[Future[void]] | ||||||||||||||||||
| for fut in self.acceptFuts: | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] self acceptfuts loop count=", self.acceptFuts.len | ||||||||||||||||||
| for i, fut in self.acceptFuts: | ||||||||||||||||||
| if not fut.finished: | ||||||||||||||||||
| if i <= 3: | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] toWait add cancelAndWait ", i | ||||||||||||||||||
| toWait.add(fut.cancelAndWait()) | ||||||||||||||||||
| if i <= 3: | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] toWait add cancelAndWait done ", i | ||||||||||||||||||
| elif fut.completed: | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] toWait add stream closeWait ", i | ||||||||||||||||||
| toWait.add(fut.read().stream.closeWait()) | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] toWait add stream closeWait done ", i | ||||||||||||||||||
|
|
||||||||||||||||||
| for server in self.httpservers: | ||||||||||||||||||
| server.stop() | ||||||||||||||||||
| for i, server in self.httpservers: | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] server closewait ", i | ||||||||||||||||||
| toWait.add(server.closeWait()) | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] server closewait done ", i | ||||||||||||||||||
|
|
||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] await allFutures" | ||||||||||||||||||
| await allFutures(toWait) | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] await allFutures done" | ||||||||||||||||||
|
|
||||||||||||||||||
| self.httpservers = @[] | ||||||||||||||||||
| trace "Transport stopped" | ||||||||||||||||||
| echo "[DEBUG WSTRANSPORT] wstransport stopped" | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| trace "Error shutting down ws transport", description = exc.msg | ||||||||||||||||||
|
|
||||||||||||||||||
| proc connHandler( | ||||||||||||||||||
| self: WsTransport, stream: WSSession, secure: bool, dir: Direction | ||||||||||||||||||
| ): Future[Connection] {.async: (raises: [CatchableError]).} = | ||||||||||||||||||
| ## Returning CatchableError is fine because we later handle different exceptions. | ||||||||||||||||||
|
|
||||||||||||||||||
| let (observedAddr, localAddr) = | ||||||||||||||||||
| try: | ||||||||||||||||||
| let | ||||||||||||||||||
| codec = | ||||||||||||||||||
| if secure: | ||||||||||||||||||
| MultiAddress.init("/wss") | ||||||||||||||||||
| else: | ||||||||||||||||||
| MultiAddress.init("/ws") | ||||||||||||||||||
| remoteAddr = stream.stream.reader.tsource.remoteAddress | ||||||||||||||||||
| localAddr = stream.stream.reader.tsource.localAddress | ||||||||||||||||||
|
|
||||||||||||||||||
| ( | ||||||||||||||||||
| MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| MultiAddress.init(localAddr).tryGet() & codec.tryGet(), | ||||||||||||||||||
| ) | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| trace "Failed to create observedAddr or listenAddr", description = exc.msg | ||||||||||||||||||
| if not (isNil(stream) and stream.stream.reader.closed): | ||||||||||||||||||
| safeClose(stream) | ||||||||||||||||||
| raise exc | ||||||||||||||||||
|
|
||||||||||||||||||
| let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr)) | ||||||||||||||||||
|
|
||||||||||||||||||
| self.connections[dir].add(conn) | ||||||||||||||||||
| proc onClose() {.async: (raises: []).} = | ||||||||||||||||||
| await noCancel conn.session.stream.reader.join() | ||||||||||||||||||
| self.connections[dir].keepItIf(it != conn) | ||||||||||||||||||
| trace "Cleaned up client" | ||||||||||||||||||
|
|
||||||||||||||||||
| asyncSpawn onClose() | ||||||||||||||||||
| return conn | ||||||||||||||||||
|
|
||||||||||||||||||
| method accept*( | ||||||||||||||||||
| self: WsTransport | ||||||||||||||||||
| ): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} = | ||||||||||||||||||
|
|
@@ -294,58 +408,44 @@ method accept*( | |||||||||||||||||
| if not self.running: | ||||||||||||||||||
| raise newTransportClosedError() | ||||||||||||||||||
|
|
||||||||||||||||||
| if self.acceptFuts.len <= 0: | ||||||||||||||||||
| self.acceptFuts = self.httpservers.mapIt(it.accept()) | ||||||||||||||||||
|
|
||||||||||||||||||
| if self.acceptFuts.len <= 0: | ||||||||||||||||||
| let res = await self.acceptResults.popFirst() | ||||||||||||||||||
|
|
||||||||||||||||||
| if res.isErr: | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm... i would still prefer to use same construct for handling error. that is using pseudocode: # early return if there is value
if not res.isErr:
return res.value
# handle errors
try:
raise res.error
except WebSocketError:
except:
except:
except:would this be possible? |
||||||||||||||||||
| let exc = res.error | ||||||||||||||||||
| if exc of WebSocketError: | ||||||||||||||||||
| debug "Websocket Error", description = exc.msg | ||||||||||||||||||
| elif exc of HttpError: | ||||||||||||||||||
| debug "Http Error", description = exc.msg | ||||||||||||||||||
| elif exc of AsyncStreamError: | ||||||||||||||||||
| debug "AsyncStream Error", description = exc.msg | ||||||||||||||||||
| elif exc of TransportTooManyError: | ||||||||||||||||||
| debug "Too many files opened", description = exc.msg | ||||||||||||||||||
| elif exc of TransportAbortedError: | ||||||||||||||||||
| debug "Connection aborted", description = exc.msg | ||||||||||||||||||
| elif exc of AsyncTimeoutError: | ||||||||||||||||||
| debug "Timed out", description = exc.msg | ||||||||||||||||||
| elif exc of TransportOsError: | ||||||||||||||||||
| debug "OS Error", description = exc.msg | ||||||||||||||||||
| elif exc of TransportUseClosedError: | ||||||||||||||||||
| debug "Server was closed", description = exc.msg | ||||||||||||||||||
| raise newTransportClosedError(exc) | ||||||||||||||||||
| elif exc of AcceptDispatcherFinishedError: | ||||||||||||||||||
| try: | ||||||||||||||||||
| self.acceptResults.addLastNoWait(res) | ||||||||||||||||||
| except AsyncQueueFullError: | ||||||||||||||||||
| raise newException(AssertionDefect, "wstransport handshakeResults queue full") | ||||||||||||||||||
|
Comment on lines
+433
to
+436
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
if accepted please search and updated other places... reasoning: |
||||||||||||||||||
| debug "Server was closed", description = exc.msg | ||||||||||||||||||
| raise newTransportClosedError(exc) | ||||||||||||||||||
| elif exc of CancelledError: | ||||||||||||||||||
| raise (ref CancelledError)(exc) | ||||||||||||||||||
| else: | ||||||||||||||||||
| info "Unexpected error accepting connection", description = exc.msg | ||||||||||||||||||
| raise newException( | ||||||||||||||||||
| transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc | ||||||||||||||||||
| ) | ||||||||||||||||||
| return | ||||||||||||||||||
|
|
||||||||||||||||||
| let finished = | ||||||||||||||||||
| try: | ||||||||||||||||||
| await one(self.acceptFuts) | ||||||||||||||||||
| except ValueError: | ||||||||||||||||||
| raiseAssert("already checked with if") | ||||||||||||||||||
| except CancelledError as e: | ||||||||||||||||||
| raise e | ||||||||||||||||||
|
|
||||||||||||||||||
| let index = self.acceptFuts.find(finished) | ||||||||||||||||||
| self.acceptFuts[index] = self.httpservers[index].accept() | ||||||||||||||||||
|
|
||||||||||||||||||
| try: | ||||||||||||||||||
| let req = await finished | ||||||||||||||||||
|
|
||||||||||||||||||
| try: | ||||||||||||||||||
| let wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout) | ||||||||||||||||||
| let isSecure = self.httpservers[index].secure | ||||||||||||||||||
|
|
||||||||||||||||||
| return await self.connHandler(wstransp, isSecure, Direction.In) | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| await noCancel req.stream.closeWait() | ||||||||||||||||||
| raise exc | ||||||||||||||||||
| except WebSocketError as exc: | ||||||||||||||||||
| debug "Websocket Error", description = exc.msg | ||||||||||||||||||
| except HttpError as exc: | ||||||||||||||||||
| debug "Http Error", description = exc.msg | ||||||||||||||||||
| except AsyncStreamError as exc: | ||||||||||||||||||
| debug "AsyncStream Error", description = exc.msg | ||||||||||||||||||
| except TransportTooManyError as exc: | ||||||||||||||||||
| debug "Too many files opened", description = exc.msg | ||||||||||||||||||
| except TransportAbortedError as exc: | ||||||||||||||||||
| debug "Connection aborted", description = exc.msg | ||||||||||||||||||
| except AsyncTimeoutError as exc: | ||||||||||||||||||
| debug "Timed out", description = exc.msg | ||||||||||||||||||
| except TransportUseClosedError as exc: | ||||||||||||||||||
| debug "Server was closed", description = exc.msg | ||||||||||||||||||
| raise newTransportClosedError(exc) | ||||||||||||||||||
| except CancelledError as exc: | ||||||||||||||||||
| raise exc | ||||||||||||||||||
| except TransportOsError as exc: | ||||||||||||||||||
| debug "OS Error", description = exc.msg | ||||||||||||||||||
| except CatchableError as exc: | ||||||||||||||||||
| info "Unexpected error accepting connection", description = exc.msg | ||||||||||||||||||
| raise newException( | ||||||||||||||||||
| transport.TransportError, "Error in WsTransport accept: " & exc.msg, exc | ||||||||||||||||||
| ) | ||||||||||||||||||
| return res.value | ||||||||||||||||||
|
|
||||||||||||||||||
| method dial*( | ||||||||||||||||||
| self: WsTransport, | ||||||||||||||||||
|
|
@@ -393,6 +493,7 @@ proc new*( | |||||||||||||||||
| factories: openArray[ExtFactory] = [], | ||||||||||||||||||
| rng: ref HmacDrbgContext = nil, | ||||||||||||||||||
| handshakeTimeout = DefaultHeadersTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer, | ||||||||||||||||||
| ): T {.raises: [].} = | ||||||||||||||||||
| ## Creates a secure WebSocket transport | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -406,6 +507,11 @@ proc new*( | |||||||||||||||||
| factories: @factories, | ||||||||||||||||||
| rng: rng, | ||||||||||||||||||
| handshakeTimeout: handshakeTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer: | ||||||||||||||||||
| if concurrentAcceptsPerHttpServer <= 0: | ||||||||||||||||||
| DefaultConcurrentAcceptsPerHttpServer | ||||||||||||||||||
| else: | ||||||||||||||||||
| concurrentAcceptsPerHttpServer, | ||||||||||||||||||
|
Comment on lines
+510
to
+514
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this logic is added there why is it not added to other |
||||||||||||||||||
| ) | ||||||||||||||||||
| procCall Transport(self).initialize() | ||||||||||||||||||
| self | ||||||||||||||||||
|
|
@@ -417,6 +523,7 @@ proc new*( | |||||||||||||||||
| factories: openArray[ExtFactory] = [], | ||||||||||||||||||
| rng: ref HmacDrbgContext = nil, | ||||||||||||||||||
| handshakeTimeout = DefaultHeadersTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer = DefaultConcurrentAcceptsPerHttpServer, | ||||||||||||||||||
| ): T {.raises: [].} = | ||||||||||||||||||
| ## Creates a clear-text WebSocket transport | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -429,4 +536,5 @@ proc new*( | |||||||||||||||||
| factories = @factories, | ||||||||||||||||||
| rng = rng, | ||||||||||||||||||
| handshakeTimeout = handshakeTimeout, | ||||||||||||||||||
| concurrentAcceptsPerHttpServer = concurrentAcceptsPerHttpServer, | ||||||||||||||||||
| ) | ||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI on windows: for some reason fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will investigate. It's hanging non-deterministically in a
[Suite ] WebSocket transporttest:...that should take 2-3 seconds every time:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add 300ms of delay between client connection attempts it doesn't hang, specifically on Windows and on Nim 2.2.6. There is a nonzero chance this is a Chronos + Nim 2.2.6 + Windows bug or some combination thereof. It doesn't seem to hang on the Nim 2.0.16 + Windows CI. I'll investigate further.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we run client, then wait it to finish (with 300m delay) we will serialize all client connections, which avoids purpose of test which is to run may client connections in parallel.
if you need to test via ci, you can run only ws tests in ci so you don't wait too much time for test to finish.
to do this, change
test/test_all.nimand just import ws tests here:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was just abusing the CI to quickly test some theories on Windows.
Thanks!
If I need to do more Windows testing I'll set it up on a Windows machine. I didn't think it was something serious, but now I think it is not a quick fix.
Sorry for the noise. And there's a good chance that this nim-libp2p PR is dead, in any case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider testing status-im/nim-websock#180 - add test case that asserts concurrent accepts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi: similar test failed on
macos-15-arm64 (Nim v2.2.6)"master" (technically not master but code just adds logs)https://github.com/vacp2p/nim-libp2p/actions/runs/19856566195/job/56895878119?pr=1924
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much! Yes, I think this patch/PR is too risky. I'll submit an alternative one that doesn't have this problem.