Skip to content

Commit 0af533a

Browse files
authored
test(transports): fix muxer handling (#1884)
1 parent e6db65b commit 0af533a

File tree

6 files changed

+26
-23
lines changed

6 files changed

+26
-23
lines changed

tests/transports/stream_tests.nim

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ template streamTransportTest*(
116116
let client = transportProvider()
117117
let conn = await client.dial("", server.addrs[0])
118118
let muxer = streamProvider(conn)
119-
discard muxer.handle()
120119

121120
let stream = await muxer.newStream()
122121
await stream.write(serverMessage)
@@ -263,7 +262,6 @@ template streamTransportTest*(
263262
let client = transportProvider()
264263
let conn = await client.dial(server.addrs[0])
265264
let muxer = streamProvider(conn)
266-
discard muxer.handle()
267265

268266
# Send incomplete messages (will block)
269267
const incompleteClientMessage = clientMessage[0 ..< 10]
@@ -394,7 +392,6 @@ template streamTransportTest*(
394392
let client = transportProvider()
395393
let conn = await client.dial(server.addrs[0])
396394
let muxer = streamProvider(conn)
397-
discard muxer.handle()
398395

399396
var futs: seq[Future[void]]
400397
for i in 0 ..< numStreams:
@@ -463,11 +460,9 @@ template streamTransportTest*(
463460
# Accept multiple connections and handle them
464461
var futs: seq[Future[void]]
465462
for i in 0 ..< numConnections:
466-
let conn = await server.accept()
467-
let muxer = streamProvider(conn)
468-
469463
# Use a proc to properly capture loop index
470-
proc setupConnection(conn: Connection, muxer: Muxer, handlerIndex: int) =
464+
proc setupConnection(conn: Connection, handlerIndex: int) =
465+
let muxer = streamProvider(conn, false)
471466
muxer.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
472467
noExceptionWithStreamClose(stream):
473468
# Read data in chunks with random delay
@@ -503,14 +498,14 @@ template streamTransportTest*(
503498

504499
futs.add(startStreamHandlerAndCleanup())
505500

506-
setupConnection(conn, muxer, i)
501+
let conn = await server.accept()
502+
setupConnection(conn, i)
507503
await allFutures(futs)
508504

509505
proc runClient(server: Transport, connectionId: int) {.async.} =
510506
let client = transportProvider()
511507
let conn = await client.dial(server.addrs[0])
512508
let muxer = streamProvider(conn)
513-
discard muxer.handle()
514509

515510
let stream = await muxer.newStream()
516511

tests/transports/testquic.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ proc quicTransProvider(): Transport {.gcsafe, raises: [].} =
2424
except ResultError[crypto.CryptoError]:
2525
raiseAssert "should not happen"
2626

27-
proc streamProvider(conn: Connection): Muxer {.raises: [].} =
27+
proc streamProvider(conn: Connection, handle: bool = true): Muxer {.raises: [].} =
2828
try:
2929
return QuicMuxer.new(conn)
3030
except CatchableError:

tests/transports/testtcp.nim

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@ import ./tcp_tests
2727
proc tcpTransProvider(): Transport =
2828
TcpTransport.new(upgrade = Upgrade())
2929

30-
proc streamProvider(conn: Connection): Muxer =
31-
Mplex.new(conn)
30+
proc streamProvider(conn: Connection, handle: bool = true): Muxer =
31+
let muxer = Mplex.new(conn)
32+
if handle:
33+
asyncSpawn muxer.handle()
34+
muxer
3235

3336
const
3437
addressIP4 = "/ip4/127.0.0.1/tcp/0"

tests/transports/testtor.nim

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@ suite "Tor transport":
3737
proc torTransProvider(): Transport =
3838
TorTransport.new(torServer, {ReuseAddr}, Upgrade())
3939

40-
proc streamProvider(conn: Connection): Muxer =
41-
Mplex.new(conn)
40+
proc streamProvider(conn: Connection, handle: bool = true): Muxer =
41+
let muxer = Mplex.new(conn)
42+
if handle:
43+
asyncSpawn muxer.handle()
44+
muxer
4245

4346
const
4447
address =

tests/transports/testws.nim

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,11 @@ proc wsSecureTransProvider(): Transport {.gcsafe, raises: [].} =
8383
except TLSStreamProtocolError:
8484
raiseAssert "should not happen"
8585

86-
proc streamProvider(conn: Connection): Muxer =
87-
Mplex.new(conn)
86+
proc streamProvider(conn: Connection, handle: bool = true): Muxer =
87+
let muxer = Mplex.new(conn)
88+
if handle:
89+
asyncSpawn muxer.handle()
90+
muxer
8891

8992
const
9093
wsAddress = "/ip4/127.0.0.1/tcp/0/ws"

tests/transports/utils.nim

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ proc createTransport*(
106106

107107
type TransportProvider* = proc(): Transport {.gcsafe, raises: [].}
108108

109-
type StreamProvider* = proc(conn: Connection): Muxer {.gcsafe, raises: [].}
109+
type StreamProvider* =
110+
proc(conn: Connection, handle: bool = true): Muxer {.gcsafe, raises: [].}
110111

111112
type StreamHandler* = proc(stream: Connection) {.async: (raises: []).}
112113

@@ -137,12 +138,11 @@ proc serverHandlerSingleStream*(
137138
) {.async: (raises: []).} =
138139
try:
139140
let conn = await server.accept()
140-
let muxer = streamProvider(conn)
141+
let muxer = streamProvider(conn, false)
141142
muxer.streamHandler = handler
142143

143-
let muxerTask = muxer.handle()
144+
await muxer.handle()
144145

145-
await muxerTask
146146
await muxer.close()
147147
await conn.close()
148148
except CatchableError as exc:
@@ -158,7 +158,6 @@ proc clientRunSingleStream*(
158158
let client = transportProvider()
159159
let conn = await client.dial("", server.addrs[0])
160160
let muxer = streamProvider(conn)
161-
discard muxer.handle()
162161

163162
let stream = await muxer.newStream()
164163
await handler(stream)
@@ -180,10 +179,10 @@ proc runSingleStreamScenario*(
180179
let serverTask =
181180
serverHandlerSingleStream(server, streamProvider, serverStreamHandler)
182181

183-
await clientRunSingleStream(
182+
let clientTask = clientRunSingleStream(
184183
server, transportProvider, streamProvider, clientStreamHandler
185184
)
186-
await serverTask
185+
await allFutures(clientTask, serverTask)
187186
await server.stop()
188187

189188
proc countTransitions*(readOrder: seq[byte]): int =

0 commit comments

Comments
 (0)