Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions tests/node/test_wakunode_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ suite "Waku Peer Exchange":

suite "fetchPeerExchangePeers":
var node2 {.threadvar.}: WakuNode
var node3 {.threadvar.}: WakuNode

asyncSetup:
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
node3 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)

await allFutures(node.start(), node2.start())
await allFutures(node.start(), node2.start(), node3.start())

asyncTeardown:
await allFutures(node.stop(), node2.stop())
await allFutures(node.stop(), node2.stop(), node3.stop())

asyncTest "Node fetches without mounting peer exchange":
# When a node, without peer exchange mounted, fetches peers
Expand Down Expand Up @@ -104,12 +106,10 @@ suite "Waku Peer Exchange":
await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()])
check node.peerManager.switch.peerStore.peers.len == 0

# Mock that we discovered a node (to avoid running discv5)
var enr = enr.Record()
assert enr.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
), "Failed to parse ENR"
node2.wakuPeerExchange.enrCache.add(enr)
# Simulate node2 discovering node3 via Discv5
var rpInfo = node3.peerInfo.toRemotePeerInfo()
rpInfo.enr = some(node3.enr)
node2.peerManager.addPeer(rpInfo, PeerOrigin.Discv5)

# Set node2 as service peer (default one) for px protocol
node.peerManager.addServicePeer(
Expand All @@ -121,10 +121,8 @@ suite "Waku Peer Exchange":
check res.tryGet() == 1

# Check that the peer ended up in the peerstore
let rpInfo = enr.toRemotePeerInfo.get()
check:
node.peerManager.switch.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
node.peerManager.switch.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)

suite "setPeerExchangePeer":
var node2 {.threadvar.}: WakuNode
Expand Down
100 changes: 44 additions & 56 deletions tests/waku_peer_exchange/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,13 @@ suite "Waku Peer Exchange":
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node3 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node4 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.start(), node2.start(), node3.start(), node4.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])

# Create connection
Expand All @@ -154,18 +158,15 @@ suite "Waku Peer Exchange":
require:
connOpt.isSome

# Create some enr and add to peer exchange (simulating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
check enr2.fromUri(
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
)
# Simulate node1 discovering node3 via Discv5
var info3 = node3.peerInfo.toRemotePeerInfo()
info3.enr = some(node3.enr)
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)

# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)
# Simulate node1 discovering node4 via Discv5
var info4 = node4.peerInfo.toRemotePeerInfo()
info4.enr = some(node4.enr)
node1.peerManager.addPeer(info4, PeerOrigin.Discv5)

# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchangeClient.request(2)
Expand All @@ -185,12 +186,12 @@ suite "Waku Peer Exchange":
response3.get().peerInfos.len == 2

# Since it can return duplicates test that at least one of the enrs is in the response
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or
response1.get().peerInfos.anyIt(it.enr == enr2.raw)
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or
response2.get().peerInfos.anyIt(it.enr == enr2.raw)
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or
response3.get().peerInfos.anyIt(it.enr == enr2.raw)
response1.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
response1.get().peerInfos.anyIt(it.enr == node4.enr.raw)
response2.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
response2.get().peerInfos.anyIt(it.enr == node4.enr.raw)
response3.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
response3.get().peerInfos.anyIt(it.enr == node4.enr.raw)

asyncTest "Request fails gracefully":
let
Expand Down Expand Up @@ -265,8 +266,8 @@ suite "Waku Peer Exchange":
peerInfo2.origin = PeerOrigin.Discv5

check:
not poolFilter(cluster, peerInfo1)
poolFilter(cluster, peerInfo2)
poolFilter(cluster, peerInfo1).isErr()
poolFilter(cluster, peerInfo2).isOk()

asyncTest "Request 0 peers, with 1 peer in PeerExchange":
# Given two valid nodes with PeerExchange
Expand All @@ -275,9 +276,11 @@ suite "Waku Peer Exchange":
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node3 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.start(), node2.start(), node3.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])

# Connect the nodes
Expand All @@ -286,12 +289,10 @@ suite "Waku Peer Exchange":
)
assert dialResponse.isSome

# Mock that we have discovered one enr
var record = enr.Record()
check record.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
node1.wakuPeerExchange.enrCache.add(record)
# Simulate node1 discovering node3 via Discv5
var info3 = node3.peerInfo.toRemotePeerInfo()
info3.enr = some(node3.enr)
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)

# When requesting 0 peers
let response = await node2.wakuPeerExchangeClient.request(0)
Expand All @@ -312,13 +313,6 @@ suite "Waku Peer Exchange":
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()])

# Mock that we have discovered one enr
var record = enr.Record()
check record.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
node2.wakuPeerExchange.enrCache.add(record)

# When making any request with an invalid peer info
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
remotePeerInfo2.peerId.data.add(255.byte)
Expand Down Expand Up @@ -362,17 +356,17 @@ suite "Waku Peer Exchange":
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node3 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.start(), node2.start(), node3.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])

# Mock that we have discovered these enrs
var enr1 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
node1.wakuPeerExchange.enrCache.add(enr1)
# Simulate node1 discovering node3 via Discv5
var info3 = node3.peerInfo.toRemotePeerInfo()
info3.enr = some(node3.enr)
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)

# Create connection
let connOpt = await node2.peerManager.dialPeer(
Expand All @@ -396,44 +390,38 @@ suite "Waku Peer Exchange":
check:
decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
decodedBuff.get().response.peerInfos[0].enr == node3.enr.raw

asyncTest "RateLimit as expected":
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node3 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.start(), node2.start(), node3.start()])
await allFutures(
[
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
node2.mountPeerExchangeClient(),
]
)

# Simulate node1 discovering nodeA via Discv5
var info3 = node3.peerInfo.toRemotePeerInfo()
info3.enr = some(node3.enr)
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)

# Create connection
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
require:
connOpt.isSome

# Create some enr and add to peer exchange (simulating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
check enr2.fromUri(
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
)

# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)

await sleepAsync(150.milliseconds)

# Request 2 peer from px. Test all request variants
Expand Down
14 changes: 14 additions & 0 deletions waku/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,17 @@ proc getPeersByCapability*(
): seq[RemotePeerInfo] =
return
peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))

template forEnrPeers*(
peerStore: PeerStore,
peerId, peerConnectedness, peerOrigin, peerEnrRecord, body: untyped,
) =
let enrBook = peerStore[ENRBook]
let connBook = peerStore[ConnectionBook]
let sourceBook = peerStore[SourceBook]
for pid, enrRecord in tables.pairs(enrBook.book):
let peerId {.inject.} = pid
let peerConnectedness {.inject.} = connBook.book.getOrDefault(pid, NotConnected)
let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin)
let peerEnrRecord {.inject.} = enrRecord
body
3 changes: 0 additions & 3 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,6 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreTransfer.isNil():
node.wakuStoreTransfer.stop()

if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()

if not node.wakuPeerExchangeClient.isNil() and
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
Expand Down
Loading