|
| 1 | +# Nim-LibP2P |
| 2 | +# Copyright (c) 2023-2025 Status Research & Development GmbH |
| 3 | +# Licensed under either of |
| 4 | +# * Apache License, version 2.0 ([LICENSE-APACHE](LICENSE-APACHE)) |
| 5 | +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) |
| 6 | +# at your option. |
| 7 | +# This file may not be copied, modified, or distributed except according to |
| 8 | +# those terms. |
| 9 | + |
| 10 | +{.used.} |
| 11 | + |
| 12 | +import chronos, stew/byteutils |
| 13 | +import |
| 14 | + ../../libp2p/[ |
| 15 | + stream/connection, |
| 16 | + transports/tcptransport, |
| 17 | + upgrademngrs/upgrade, |
| 18 | + multiaddress, |
| 19 | + multicodec, |
| 20 | + wire, |
| 21 | + ] |
| 22 | +import ../tools/[unittest] |
| 23 | +import ./utils |
| 24 | + |
| 25 | +const |
| 26 | + message = "No Backdoors. No Masters. No Silence." |
| 27 | + zeroMAStrIP4 = "/ip4/0.0.0.0/tcp/0" |
| 28 | + zeroMAStrIP6 = "/ip6/::/tcp/0" |
| 29 | + zeroTAStrIP4 = "0.0.0.0:0" |
| 30 | + zeroTAStrIP6 = "[::]:0" |
| 31 | + |
| 32 | +template tcpListenerIPTests(suiteName: string, listenMA: MultiAddress) = |
| 33 | + block: |
| 34 | + asyncTest suiteName & ":listener: handle write": |
| 35 | + let server = TcpTransport.new(upgrade = Upgrade()) |
| 36 | + await server.start(@[listenMA]) |
| 37 | + |
| 38 | + proc serverHandler() {.async.} = |
| 39 | + let conn = await server.accept() |
| 40 | + await conn.write(message) |
| 41 | + await conn.close() |
| 42 | + |
| 43 | + let handlerFut = serverHandler() |
| 44 | + |
| 45 | + let conn = await connect(server.addrs[0]) |
| 46 | + let receivedData = await conn.read(message.len) |
| 47 | + check string.fromBytes(receivedData) == message |
| 48 | + |
| 49 | + await handlerFut.wait(1.seconds) |
| 50 | + await conn.closeWait() |
| 51 | + await server.stop() |
| 52 | + |
| 53 | + asyncTest suiteName & ":listener: handle read": |
| 54 | + let server = TcpTransport.new(upgrade = Upgrade()) |
| 55 | + await server.start(@[listenMA]) |
| 56 | + |
| 57 | + proc serverHandler() {.async.} = |
| 58 | + let conn = await server.accept() |
| 59 | + var msg = newSeq[byte](message.len) |
| 60 | + await conn.readExactly(addr msg[0], message.len) |
| 61 | + check string.fromBytes(msg) == message |
| 62 | + await conn.close() |
| 63 | + |
| 64 | + let handlerFut = serverHandler() |
| 65 | + |
| 66 | + let conn = await connect(server.addrs[0]) |
| 67 | + let sentBytes = await conn.write(message) |
| 68 | + check sentBytes == message.len |
| 69 | + |
| 70 | + await handlerFut.wait(1.seconds) |
| 71 | + await conn.closeWait() |
| 72 | + await server.stop() |
| 73 | + |
| 74 | +template tcpDialerIPTest(suiteName: string, listenTA: TransportAddress) = |
| 75 | + block: |
| 76 | + asyncTest suiteName & ":dialer: handle write": |
| 77 | + let handlerFut = newFuture[void]() |
| 78 | + proc serverHandler(server: StreamServer, transp: StreamTransport) {.async.} = |
| 79 | + var wstream = newAsyncStreamWriter(transp) |
| 80 | + await wstream.write(message) |
| 81 | + await wstream.finish() |
| 82 | + await wstream.closeWait() |
| 83 | + await transp.closeWait() |
| 84 | + handlerFut.complete() |
| 85 | + |
| 86 | + let server = createStreamServer(listenTA, serverHandler, {ReuseAddr}) |
| 87 | + server.start() |
| 88 | + |
| 89 | + let ma = MultiAddress.init(server.sock.getLocalAddress()).tryGet() |
| 90 | + let client = TcpTransport.new(upgrade = Upgrade()) |
| 91 | + let conn = await client.dial(ma) |
| 92 | + |
| 93 | + var msg = newSeq[byte](message.len) |
| 94 | + await conn.readExactly(addr msg[0], message.len) |
| 95 | + check string.fromBytes(msg) == message |
| 96 | + |
| 97 | + await handlerFut.wait(1.seconds) |
| 98 | + await conn.close() |
| 99 | + await client.stop() |
| 100 | + server.stop() |
| 101 | + server.close() |
| 102 | + await server.join() |
| 103 | + |
| 104 | + asyncTest suiteName & ":dialer: handle write": |
| 105 | + let handlerFut = newFuture[void]() |
| 106 | + proc serverHandler(server: StreamServer, transp: StreamTransport) {.async.} = |
| 107 | + var rstream = newAsyncStreamReader(transp) |
| 108 | + let msg = await rstream.read(message.len) |
| 109 | + check string.fromBytes(msg) == message |
| 110 | + |
| 111 | + await rstream.closeWait() |
| 112 | + await transp.closeWait() |
| 113 | + handlerFut.complete() |
| 114 | + |
| 115 | + let server = createStreamServer(listenTA, serverHandler, {ReuseAddr}) |
| 116 | + server.start() |
| 117 | + |
| 118 | + let ma = MultiAddress.init(server.sock.getLocalAddress()).tryGet() |
| 119 | + let client = TcpTransport.new(upgrade = Upgrade()) |
| 120 | + let conn = await client.dial(ma) |
| 121 | + await conn.write(message) |
| 122 | + |
| 123 | + await handlerFut.wait(1.seconds) |
| 124 | + await conn.close() |
| 125 | + await client.stop() |
| 126 | + server.stop() |
| 127 | + server.close() |
| 128 | + await server.join() |
| 129 | + |
| 130 | +template tcpTests*() = |
| 131 | + tcpListenerIPTests("ipv4", MultiAddress.init(zeroMAStrIP4).tryGet()) |
| 132 | + tcpListenerIPTests("ipv6", MultiAddress.init(zeroMAStrIP6).tryGet()) |
| 133 | + tcpDialerIPTest("ipv4", initTAddress(zeroTAStrIP4)) |
| 134 | + tcpDialerIPTest("ipv6", initTAddress(zeroTAStrIP6)) |
| 135 | + |
| 136 | + block: |
| 137 | + let listenMA = MultiAddress.init(zeroMAStrIP4).tryGet() |
| 138 | + |
| 139 | + asyncTest "starting with duplicate but zero ports addresses must NOT fail": |
| 140 | + let transport = TcpTransport.new(upgrade = Upgrade()) |
| 141 | + |
| 142 | + await transport.start(@[listenMA, listenMA]) |
| 143 | + await transport.stop() |
| 144 | + |
| 145 | + asyncTest "bind to listening port when not reachable": |
| 146 | + let transport1 = TcpTransport.new(upgrade = Upgrade()) |
| 147 | + await transport1.start(@[listenMA]) |
| 148 | + |
| 149 | + let transport2 = TcpTransport.new(upgrade = Upgrade()) |
| 150 | + await transport2.start(@[listenMA]) |
| 151 | + |
| 152 | + let transport3 = TcpTransport.new(upgrade = Upgrade()) |
| 153 | + await transport3.start(@[listenMA]) |
| 154 | + |
| 155 | + let listeningPortTransport1 = transport1.addrs[0][multiCodec("tcp")].get() |
| 156 | + |
| 157 | + let conn = await transport1.dial(transport2.addrs[0]) |
| 158 | + let acceptedConn = await transport2.accept() |
| 159 | + let acceptedPort2 = acceptedConn.observedAddr.get()[multiCodec("tcp")].get() |
| 160 | + check listeningPortTransport1 != acceptedPort2 |
| 161 | + |
| 162 | + transport1.networkReachability = NetworkReachability.NotReachable |
| 163 | + |
| 164 | + let conn2 = await transport1.dial(transport3.addrs[0]) |
| 165 | + let acceptedConn3 = await transport3.accept() |
| 166 | + let acceptedPort3 = acceptedConn3.observedAddr.get()[multiCodec("tcp")].get() |
| 167 | + check listeningPortTransport1 == acceptedPort3 |
| 168 | + |
| 169 | + await allFutures(transport1.stop(), transport2.stop(), transport3.stop()) |
| 170 | + |
| 171 | + asyncTest "custom timeout": |
| 172 | + let server = |
| 173 | + TcpTransport.new(upgrade = Upgrade(), connectionsTimeout = 1.milliseconds) |
| 174 | + await server.start(@[listenMA]) |
| 175 | + |
| 176 | + proc serverHandler() {.async.} = |
| 177 | + let conn = await server.accept() |
| 178 | + await conn.join() |
| 179 | + |
| 180 | + let handlerFut = serverHandler() |
| 181 | + |
| 182 | + let streamTransport = await connect(server.addrs[0]) |
| 183 | + await handlerFut.wait(1.seconds) |
| 184 | + await streamTransport.closeWait() |
| 185 | + await server.stop() |
0 commit comments