Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ proc closeUnderlying(s: LPChannel): Future[void] {.async: (raises: []).} =
if s.closedLocal and s.atEof():
await procCall BufferStream(s).close()

proc reset*(s: LPChannel) {.async: (raises: []).} =
method resetImpl*(s: LPChannel) {.async: (raises: []).} =
if s.isClosed:
trace "Already closed", s
return
Expand All @@ -120,8 +120,6 @@ proc reset*(s: LPChannel) {.async: (raises: []).} =

asyncSpawn resetMessage()

await s.closeImpl()

trace "Channel reset", s

method close*(s: LPChannel) {.async: (raises: []).} =
Expand Down Expand Up @@ -175,7 +173,7 @@ method readOnce*(
trace "reset stream in readOnce", s
raise newLPStreamResetError()
if s.localReset:
raise newLPStreamClosedError()
raise newLPStreamResetError()
if s.atEof():
raise newLPStreamRemoteClosedError()
if s.conn.closed:
Expand Down Expand Up @@ -204,7 +202,7 @@ proc prepareWrite(
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
# prepareWrite is the slow path of writing a message - see conditions in
# write
if s.remoteReset:
if s.remoteReset or s.localReset:
trace "stream is reset when prepareWrite", s
raise newLPStreamResetError()
if s.closedLocal:
Expand Down
5 changes: 3 additions & 2 deletions libp2p/muxers/mplex/mplex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import chronos, chronicles, stew/byteutils, metrics
import
../muxer,
../../stream/connection,
../../stream/lpstream,
../../stream/bufferstream,
../../utility,
../../peerinfo,
Expand Down Expand Up @@ -182,7 +183,7 @@ method handle*(m: Mplex) {.async: (raises: []).} =
await channel.pushEof()
of MessageType.ResetIn, MessageType.ResetOut:
channel.remoteReset = true
await channel.reset()
await LPStream(channel).reset()
except CancelledError:
debug "Unexpected cancellation in mplex handler", m
except LPStreamEOFError as exc:
Expand Down Expand Up @@ -241,7 +242,7 @@ method close*(m: Mplex) {.async: (raises: []).} =
channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values)

for chann in channs:
await chann.reset()
await LPStream(chann).reset()

m.channels[false].clear()
m.channels[true].clear()
Expand Down
57 changes: 27 additions & 30 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type
isSending: bool
sendQueue: seq[ToSend]
recvQueue: ZeroQueue
isReset: bool
resetLocally: bool
remoteReset: bool
closedRemotely: AsyncEvent
closedLocally: bool
Expand All @@ -166,7 +166,7 @@ proc `$`(channel: YamuxChannel): string =
s.add("ClosedRemotely")
if channel.closedLocally:
s.add("ClosedLocally")
if channel.isReset:
if channel.resetLocally:
s.add("Reset")
if s.len > 0:
result &=
Expand Down Expand Up @@ -210,7 +210,7 @@ method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
channel.closedLocally = true

if not channel.isReset and channel.sendQueue.len == 0:
if not channel.resetLocally and channel.sendQueue.len == 0:
try:
await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
except CancelledError, LPStreamError:
Expand All @@ -230,14 +230,15 @@ proc clearQueues(channel: YamuxChannel, error: ref LPStreamEOFError = nil) =
channel.sendQueue = @[]
channel.recvQueue.clear()

proc reset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).} =
proc doReset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).} =
# If we reset locally, we want to flush up to a maximum of recvWindow
# bytes. It's because the peer we're connected to can send us data before
# it receives the reset.
if channel.isReset:
if channel.resetLocally:
return

trace "Reset channel"
channel.isReset = true
channel.resetLocally = true
channel.remoteReset = not isLocal
channel.clearQueues(newLPStreamEOFError())

Expand All @@ -256,6 +257,9 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).}
# If the reset is remote, there's no reason to flush anything.
channel.recvWindow = 0

method resetImpl*(channel: YamuxChannel) {.async: (raises: []).} =
channel.doReset(isLocal = true)

Comment on lines +260 to +262
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if doReset can become resetImpl, because doReset is actually resetImpl and having yet another resetImpl seems bit convoluted

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. The main annoyance is that doReset has diff behaviors depending on the isLocal parameter.
While it's a bit convoluted it should be fine, since we want to not dedicate dev time to yamux anyway...

proc updateRecvWindow(
channel: YamuxChannel
) {.async: (raises: [CancelledError, LPStreamError]).} =
Expand All @@ -276,21 +280,15 @@ method readOnce*(
channel: YamuxChannel, pbytes: pointer, nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
## Read from a yamux channel

if channel.isReset:
raise
if channel.remoteReset:
trace "stream is remote reset when readOnce", channel = $channel
newLPStreamResetError()
elif channel.closedLocally:
trace "stream is closed locally when readOnce", channel = $channel
newLPStreamClosedError()
else:
trace "stream is down when readOnce", channel = $channel
newLPStreamConnDownError()
if channel.resetLocally or channel.remoteReset:
trace "stream is reset when readOnce",
channel = $channel,
resetLocally = channel.resetLocally,
remoteReset = channel.remoteReset
raise newLPStreamResetError()
if channel.isEof:
channel.clearQueues()
raise newLPStreamRemoteClosedError()
raise newLPStreamEOFError()
if channel.recvQueue.isEmpty():
channel.receivedData.clear()
let
Expand Down Expand Up @@ -346,7 +344,7 @@ proc sendLoop(channel: YamuxChannel) {.async: (raises: []).} =
trace "channel send queue too big, resetting",
maxSendQueueSize = channel.maxSendQueueSize,
currentQueueSize = channel.lengthSendQueueWithLimit()
await channel.reset(isLocal = true)
await channel.doReset(isLocal = true)
break

let
Expand Down Expand Up @@ -396,7 +394,7 @@ proc sendLoop(channel: YamuxChannel) {.async: (raises: []).} =
let connDown = newLPStreamConnDownError(exc)
for fut in futures:
fut.fail(connDown)
await channel.reset()
await channel.doReset()
break

for fut in futures:
Expand All @@ -410,13 +408,12 @@ method write*(
## Write to yamux channel
##
var resFut = newFuture[void]("Yamux Send")

if channel.remoteReset:
if channel.remoteReset or channel.resetLocally:
trace "stream is reset when write", channel = $channel
resFut.fail(newLPStreamResetError())
return resFut

if channel.closedLocally or channel.isReset:
if channel.closedLocally:
resFut.fail(newLPStreamClosedError())
return resFut

Expand All @@ -440,7 +437,7 @@ proc open(channel: YamuxChannel) {.async: (raises: [CancelledError, LPStreamErro
trace "Try to open channel twice"
return
channel.opened = true
channel.isReset = false
channel.resetLocally = false

await channel.conn.write(
YamuxHeader.windowUpdate(
Expand Down Expand Up @@ -479,7 +476,7 @@ proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} =
libp2p_yamux_channels.set(
m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId]
)
if channel.isReset and channel.recvWindow > 0:
if channel.resetLocally and channel.recvWindow > 0:
m.flushed[channel.id] = channel.recvWindow

proc createStream(
Expand Down Expand Up @@ -511,7 +508,7 @@ proc createStream(
stream.timeout = m.inTimeout
stream.timeoutHandler = proc(): Future[void] {.async: (raises: [], raw: true).} =
trace "Idle timeout expired, resetting YamuxChannel"
stream.reset(isLocal = true)
stream.doReset(isLocal = true)
stream.initStream()
stream.peerId = m.connection.peerId
stream.observedAddr = m.connection.observedAddr
Expand All @@ -538,7 +535,7 @@ method close*(m: Yamux) {.async: (raises: []).} =
channel.recvWindow = 0
channel.sendWindow = 0
channel.closedLocally = true
channel.isReset = true
channel.resetLocally = false
channel.opened = false
channel.isClosed = true
await channel.remoteClosed()
Expand Down Expand Up @@ -599,7 +596,7 @@ method handle*(m: Yamux) {.async: (raises: []).} =
if m.channels.len > m.maxChannCount:
warn "too many channels created by remote peer",
peerId = m.connection.peerId, allowedMax = m.maxChannCount
await newStream.reset()
await newStream.doReset()
continue
await newStream.open()
asyncSpawn m.handleStream(newStream)
Expand Down Expand Up @@ -650,7 +647,7 @@ method handle*(m: Yamux) {.async: (raises: []).} =
await channel.remoteClosed()
if MsgFlags.Rst in header.flags:
trace "remote reset channel"
await channel.reset()
await channel.doReset()
except CancelledError as exc:
debug "Unexpected cancellation in yamux handler", description = exc.msg
except LPStreamEOFError as exc:
Expand Down
17 changes: 16 additions & 1 deletion libp2p/stream/lpstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type
closeEvent*: AsyncEvent
isClosed*: bool
isEof*: bool
isReset*: bool
objName*: string
oid*: Oid
dir*: Direction
Expand All @@ -54,7 +55,7 @@ type
# X | Read | Write
# Local close | Works | LPStreamClosedError
# Remote close | LPStreamRemoteClosedError | Works
# Local reset | LPStreamClosedError | LPStreamClosedError
# Local reset | LPStreamResetError | LPStreamResetError
# Remote reset | LPStreamResetError | LPStreamResetError
# Connection down | LPStreamConnDown | LPStreamConnDownError
LPStreamResetError* = object of LPStreamEOFError
Expand Down Expand Up @@ -333,3 +334,17 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async: (raises: []), public.} =
trace "Expected EOF came", s, description = e.msg
except LPStreamError as exc:
debug "Unexpected error while waiting for EOF", s, description = exc.msg

method resetImpl*(s: LPStream): Future[void] {.async: (raises: [], raw: true), base.} =
## Implementation of reset - called only once
let fut = newFuture[void]()
fut.complete()
fut

method reset*(s: LPStream): Future[void] {.async: (raises: []), base, public.} =
if s.isReset:
trace "Already reset"
return
s.isReset = true
await s.resetImpl()
await s.closeImpl()
8 changes: 8 additions & 0 deletions libp2p/transports/quictransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import chronos
import chronicles
import metrics
import quic
import quic/transport/stream
import results
import ../multiaddress
import ../multicodec
Expand Down Expand Up @@ -102,6 +103,13 @@ method closeImpl*(stream: QuicStream) {.async: (raises: []).} =
discard
await procCall P2PConnection(stream).closeImpl()

method resetImpl*(stream: QuicStream) {.async: (raises: []).} =
try:
quic.Stream(stream.stream).reset()
except CatchableError as exc:
discard
await procCall P2PConnection(stream).resetImpl()

# Session
type QuicSession* = ref object of P2PConnection
connection: QuicConnection
Expand Down
6 changes: 3 additions & 3 deletions tests/testmplex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ suite "Mplex":

await chann.reset()
var data = newSeq[byte](1)
expect LPStreamClosedError:
expect LPStreamResetError:
await chann.readExactly(addr data[0], 1)

await conn.close()
Expand All @@ -244,7 +244,7 @@ suite "Mplex":
let fut = chann.readExactly(addr data[0], 1)

await chann.reset()
expect LPStreamClosedError:
expect LPStreamResetError:
await fut

await conn.close()
Expand Down Expand Up @@ -432,7 +432,7 @@ suite "Mplex":
chann = LPChannel.init(1, conn, true)
await chann.reset()

expect LPStreamClosedError:
expect LPStreamResetError:
await chann.write(("Hello!").toBytes)

await conn.close()
Expand Down
14 changes: 7 additions & 7 deletions tests/testyamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,21 @@ suite "Yamux":
check streamA == yamuxa.getStreams()[0]

await streamA.writeLp(fromHex("1234"))
expect LPStreamRemoteClosedError:
expect LPStreamEOFError:
discard await streamA.readLp(100)
await streamA.writeLp(fromHex("5678"))
await streamA.close()

asyncTest "Local & Remote reset":
asyncTest "Local & Remote read/write after Yamux is closed":
mSetup()
let blocker = newBlockerFut()

yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await blocker
defer:
await blocker
try:
expect LPStreamResetError:
expect LPStreamClosedError:
discard await conn.readLp(100)
expect LPStreamResetError:
expect LPStreamClosedError:
await conn.writeLp(fromHex("1234"))
except CancelledError, LPStreamError:
return
Expand All @@ -402,7 +402,7 @@ suite "Yamux":
await yamuxa.close()
expect LPStreamClosedError:
await streamA.writeLp(fromHex("1234"))
expect LPStreamClosedError:
expect LPStreamEOFError:
discard await streamA.readLp(100)
blocker.complete()
await streamA.close()
Expand Down
Loading