Skip to content

Commit 33ea3e8

Browse files
authored
fix(quic): muxer handling (#1885)
1 parent 5ad7a23 commit 33ea3e8

File tree

3 files changed

+34
-24
lines changed

3 files changed

+34
-24
lines changed

libp2p/transports/quictransport.nim

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -121,20 +121,15 @@ method close*(session: QuicSession) {.async: (raises: []).} =
121121
safeClose(session.connection)
122122
await procCall P2PConnection(session).close()
123123

124-
proc getStream*(
124+
proc getStream(
125125
session: QuicSession, direction = Direction.In
126-
): Future[QuicStream] {.async: (raises: [QuicTransportError]).} =
126+
): Future[QuicStream] {.async: (raises: [CancelledError, QuicError]).} =
127127
var stream: Stream
128-
try:
129-
case direction
130-
of Direction.In:
131-
stream = await session.connection.incomingStream()
132-
of Direction.Out:
133-
stream = await session.connection.openStream()
134-
except CancelledError as exc:
135-
raise (ref QuicTransportError)(msg: "cancelled getStream: " & exc.msg, parent: exc)
136-
except QuicError as exc:
137-
raise (ref QuicTransportError)(msg: "error in getStream: " & exc.msg, parent: exc)
128+
case direction
129+
of Direction.In:
130+
stream = await session.connection.incomingStream()
131+
of Direction.Out:
132+
stream = await session.connection.openStream()
138133

139134
let qs =
140135
QuicStream.new(stream, session.observedAddr, session.localAddr, session.peerId)
@@ -181,7 +176,7 @@ method newStream*(
181176
.} =
182177
try:
183178
return await m.session.getStream(Direction.Out)
184-
except QuicTransportError as exc:
179+
except QuicError as exc:
185180
raise newException(MuxerError, "error in newStream: " & exc.msg, exc)
186181

187182
method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} =
@@ -192,19 +187,30 @@ method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} =
192187
trace "finished handling stream"
193188
doAssert(chann.closed, "connection not closed by handler!")
194189

195-
try:
196-
while not m.session.atEof:
197-
let stream = await m.session.getStream(Direction.In)
198-
asyncSpawn handleStream(stream)
199-
except QuicTransportError as exc:
200-
trace "Exception in quic handler", msg = exc.msg
190+
while not m.session.atEof:
191+
let stream =
192+
try:
193+
await m.session.getStream(Direction.In)
194+
except CancelledError:
195+
# keep handling, until connection is closed
196+
continue
197+
except QuicError as exc:
198+
if exc.msg == "connection closed":
199+
# stop handling, connection was closed
200+
break
201+
else:
202+
# keep handling, until connection is closed.
203+
# this stream failed but we need to keep handling for other streams.
204+
trace "QuicMuxer.handler got error while opening stream", msg = exc.msg
205+
continue
206+
asyncSpawn handleStream(stream)
201207

202208
method close*(m: QuicMuxer) {.async: (raises: []).} =
203209
try:
204210
await m.session.close()
205211
if not isNil(m.handleFut):
206212
m.handleFut.cancelSoon()
207-
except CatchableError as exc:
213+
except CatchableError:
208214
discard
209215

210216
# Transport

tests/transports/testquic.nim

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ suite "Quic transport":
9494
# client should be able to write even when server has not accepted
9595
let client = await createTransport()
9696
let conn = await client.dial("", server.addrs[0])
97-
let stream = await getStream(QuicSession(conn), Direction.Out)
97+
let muxer = QuicMuxer.new(conn)
98+
let stream = await muxer.newStream()
9899
await stream.write("client")
99100
await client.stop()
100101

tests/transports/utils.nim

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,21 @@ proc isQuicTransport*(ma: MultiAddress): bool =
4747
proc createServerAcceptConn*(
4848
server: QuicTransport
4949
): proc(): Future[void] {.
50-
async: (raises: [transport.TransportError, LPStreamError, CancelledError])
50+
async: (raises: [transport.TransportError, LPError, LPStreamError, CancelledError])
5151
.} =
5252
proc handler() {.
53-
async: (raises: [transport.TransportError, LPStreamError, CancelledError])
53+
async:
54+
(raises: [transport.TransportError, LPError, LPStreamError, CancelledError])
5455
.} =
5556
let conn = await server.accept()
5657
if conn == nil:
5758
return
5859

59-
let stream = await getStream(QuicSession(conn), Direction.In)
60+
let muxer = QuicMuxer.new(conn)
61+
let stream = await muxer.newStream()
6062
defer:
6163
await stream.close()
64+
await muxer.close()
6265

6366
var resp: array[6, byte]
6467
await stream.readExactly(addr resp, 6)

0 commit comments

Comments
 (0)