diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index 9b0ea4c407..e6649c4554 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -66,15 +66,17 @@ suite "Waku Peer Exchange": suite "fetchPeerExchangePeers": var node2 {.threadvar.}: WakuNode + var node3 {.threadvar.}: WakuNode asyncSetup: node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) + node3 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) - await allFutures(node.start(), node2.start()) + await allFutures(node.start(), node2.start(), node3.start()) asyncTeardown: - await allFutures(node.stop(), node2.stop()) + await allFutures(node.stop(), node2.stop(), node3.stop()) asyncTest "Node fetches without mounting peer exchange": # When a node, without peer exchange mounted, fetches peers @@ -104,12 +106,10 @@ suite "Waku Peer Exchange": await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()]) check node.peerManager.switch.peerStore.peers.len == 0 - # Mock that we discovered a node (to avoid running discv5) - var enr = enr.Record() - assert enr.fromUri( - "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" - ), "Failed to parse ENR" - node2.wakuPeerExchange.enrCache.add(enr) + # Simulate node2 discovering node3 via Discv5 + var rpInfo = node3.peerInfo.toRemotePeerInfo() + rpInfo.enr = some(node3.enr) + node2.peerManager.addPeer(rpInfo, PeerOrigin.Discv5) # Set node2 as service peer (default one) for px protocol node.peerManager.addServicePeer( @@ -121,10 +121,8 @@ suite "Waku Peer Exchange": check res.tryGet() == 1 # Check that the peer ended up in the peerstore - let rpInfo = enr.toRemotePeerInfo.get() check: node.peerManager.switch.peerStore.peers.anyIt(it.peerId == rpInfo.peerId) - node.peerManager.switch.peerStore.peers.anyIt(it.addrs == rpInfo.addrs) suite "setPeerExchangePeer": var node2 {.threadvar.}: WakuNode diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 204338a854..74cdba110e 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -142,9 +142,13 @@ suite "Waku Peer Exchange": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node3 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node4 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) + await allFutures([node1.start(), node2.start(), node3.start(), node4.start()]) await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()]) # Create connection @@ -154,18 +158,15 @@ suite "Waku Peer Exchange": require: connOpt.isSome - # Create some enr and add to peer exchange (simulating disv5) - var enr1, enr2 = enr.Record() - check enr1.fromUri( - "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" - ) - check enr2.fromUri( - "enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB" - ) + # Simulate node1 discovering node3 via Discv5 + var info3 = node3.peerInfo.toRemotePeerInfo() + info3.enr = some(node3.enr) + node1.peerManager.addPeer(info3, PeerOrigin.Discv5) - # Mock that we have discovered these enrs - node1.wakuPeerExchange.enrCache.add(enr1) - node1.wakuPeerExchange.enrCache.add(enr2) + # Simulate node1 discovering node4 via Discv5 + var info4 = node4.peerInfo.toRemotePeerInfo() + info4.enr = some(node4.enr) + node1.peerManager.addPeer(info4, PeerOrigin.Discv5) # Request 2 peer from px. Test all request variants let response1 = await node2.wakuPeerExchangeClient.request(2) @@ -185,12 +186,12 @@ suite "Waku Peer Exchange": response3.get().peerInfos.len == 2 # Since it can return duplicates test that at least one of the enrs is in the response - response1.get().peerInfos.anyIt(it.enr == enr1.raw) or - response1.get().peerInfos.anyIt(it.enr == enr2.raw) - response2.get().peerInfos.anyIt(it.enr == enr1.raw) or - response2.get().peerInfos.anyIt(it.enr == enr2.raw) - response3.get().peerInfos.anyIt(it.enr == enr1.raw) or - response3.get().peerInfos.anyIt(it.enr == enr2.raw) + response1.get().peerInfos.anyIt(it.enr == node3.enr.raw) or + response1.get().peerInfos.anyIt(it.enr == node4.enr.raw) + response2.get().peerInfos.anyIt(it.enr == node3.enr.raw) or + response2.get().peerInfos.anyIt(it.enr == node4.enr.raw) + response3.get().peerInfos.anyIt(it.enr == node3.enr.raw) or + response3.get().peerInfos.anyIt(it.enr == node4.enr.raw) asyncTest "Request fails gracefully": let @@ -265,8 +266,8 @@ suite "Waku Peer Exchange": peerInfo2.origin = PeerOrigin.Discv5 check: - not poolFilter(cluster, peerInfo1) - poolFilter(cluster, peerInfo2) + poolFilter(cluster, peerInfo1).isErr() + poolFilter(cluster, peerInfo2).isOk() asyncTest "Request 0 peers, with 1 peer in PeerExchange": # Given two valid nodes with PeerExchange @@ -275,9 +276,11 @@ suite "Waku Peer Exchange": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node3 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) + await allFutures([node1.start(), node2.start(), node3.start()]) await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()]) # Connect the nodes @@ -286,12 +289,10 @@ suite "Waku Peer Exchange": ) assert dialResponse.isSome - # Mock that we have discovered one enr - var record = enr.Record() - check record.fromUri( - "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" - ) - node1.wakuPeerExchange.enrCache.add(record) + # Simulate node1 discovering node3 via Discv5 + var info3 = node3.peerInfo.toRemotePeerInfo() + info3.enr = some(node3.enr) + node1.peerManager.addPeer(info3, PeerOrigin.Discv5) # When requesting 0 peers let response = await node2.wakuPeerExchangeClient.request(0) @@ -312,13 +313,6 @@ suite "Waku Peer Exchange": await allFutures([node1.start(), node2.start()]) await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()]) - # Mock that we have discovered one enr - var record = enr.Record() - check record.fromUri( - "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" - ) - node2.wakuPeerExchange.enrCache.add(record) - # When making any request with an invalid peer info var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() remotePeerInfo2.peerId.data.add(255.byte) @@ -362,17 +356,17 @@ suite "Waku Peer Exchange": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node3 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) + await allFutures([node1.start(), node2.start(), node3.start()]) await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) - # Mock that we have discovered these enrs - var enr1 = enr.Record() - check enr1.fromUri( - "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" - ) - node1.wakuPeerExchange.enrCache.add(enr1) + # Simulate node1 discovering node3 via Discv5 + var info3 = node3.peerInfo.toRemotePeerInfo() + info3.enr = some(node3.enr) + node1.peerManager.addPeer(info3, PeerOrigin.Discv5) # Create connection let connOpt = await node2.peerManager.dialPeer( @@ -396,7 +390,7 @@ suite "Waku Peer Exchange": check: decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS decodedBuff.get().response.peerInfos.len == 1 - decodedBuff.get().response.peerInfos[0].enr == enr1.raw + decodedBuff.get().response.peerInfos[0].enr == node3.enr.raw asyncTest "RateLimit as expected": let @@ -404,9 +398,11 @@ suite "Waku Peer Exchange": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node3 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) + await allFutures([node1.start(), node2.start(), node3.start()]) await allFutures( [ node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)), @@ -414,6 +410,11 @@ suite "Waku Peer Exchange": ] ) + # Simulate node1 discovering nodeA via Discv5 + var info3 = node3.peerInfo.toRemotePeerInfo() + info3.enr = some(node3.enr) + node1.peerManager.addPeer(info3, PeerOrigin.Discv5) + # Create connection let connOpt = await node2.peerManager.dialPeer( node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec @@ -421,19 +422,6 @@ suite "Waku Peer Exchange": require: connOpt.isSome - # Create some enr and add to peer exchange (simulating disv5) - var enr1, enr2 = enr.Record() - check enr1.fromUri( - "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" - ) - check enr2.fromUri( - "enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB" - ) - - # Mock that we have discovered these enrs - node1.wakuPeerExchange.enrCache.add(enr1) - node1.wakuPeerExchange.enrCache.add(enr2) - await sleepAsync(150.milliseconds) # Request 2 peer from px. Test all request variants diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index 9cde53fe17..b7f2669e55 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -227,3 +227,17 @@ proc getPeersByCapability*( ): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap)) + +template forEnrPeers*( + peerStore: PeerStore, + peerId, peerConnectedness, peerOrigin, peerEnrRecord, body: untyped, +) = + let enrBook = peerStore[ENRBook] + let connBook = peerStore[ConnectionBook] + let sourceBook = peerStore[SourceBook] + for pid, enrRecord in tables.pairs(enrBook.book): + let peerId {.inject.} = pid + let peerConnectedness {.inject.} = connBook.book.getOrDefault(pid, NotConnected) + let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin) + let peerEnrRecord {.inject.} = enrRecord + body diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 65b2093bb6..07e36dd138 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -525,9 +525,6 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuStoreTransfer.isNil(): node.wakuStoreTransfer.stop() - if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): - await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() - if not node.wakuPeerExchangeClient.isNil() and not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index cf7ebc2a71..b99f5eabf1 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -22,7 +22,6 @@ export WakuPeerExchangeCodec declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange" -declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached" declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"] declarePublicCounter waku_px_peers_sent, "number of ENRs sent to peer exchange requesters" @@ -32,11 +31,9 @@ logScope: type WakuPeerExchange* = ref object of LPProtocol peerManager*: PeerManager - enrCache*: seq[enr.Record] cluster*: Option[uint16] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ requestRateLimiter*: RequestRateLimiter - pxLoopHandle*: Future[void] proc respond( wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection @@ -79,61 +76,50 @@ proc respondError( return ok() -proc getEnrsFromCache( - wpx: WakuPeerExchange, numPeers: uint64 -): seq[enr.Record] {.gcsafe.} = - if wpx.enrCache.len() == 0: - info "peer exchange ENR cache is empty" - return @[] - - # copy and shuffle - randomize() - var shuffledCache = wpx.enrCache - shuffledCache.shuffle() - - # return numPeers or less if cache is smaller - return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)] - -proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = - if peer.origin != Discv5: - trace "peer not from discv5", peer = $peer, origin = $peer.origin - return false +proc poolFilter*( + cluster: Option[uint16], origin: PeerOrigin, enr: enr.Record +): Result[void, string] = + if origin != Discv5: + trace "peer not from discv5", origin = $origin + return err("peer not from discv5: " & $origin) + if cluster.isSome() and enr.isClusterMismatched(cluster.get()): + trace "peer has mismatching cluster" + return err("peer has mismatching cluster") + return ok() +proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): Result[void, string] = if peer.enr.isNone(): info "peer has no ENR", peer = $peer - return false - - if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()): - info "peer has mismatching cluster", peer = $peer - return false - - return true + return err("peer has no ENR: " & $peer) + return poolFilter(cluster, peer.origin, peer.enr.get()) -proc populateEnrCache(wpx: WakuPeerExchange) = - # share only peers that i) are reachable ii) come from discv5 iii) share cluster - let withEnr = wpx.peerManager.switch.peerStore.getReachablePeers().filterIt( - poolFilter(wpx.cluster, it) - ) - - # either what we have or max cache size - var newEnrCache = newSeq[enr.Record](0) - for i in 0 ..< min(withEnr.len, MaxPeersCacheSize): - newEnrCache.add(withEnr[i].enr.get()) - - # swap cache for new - wpx.enrCache = newEnrCache - trace "ENR cache populated" - -proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} = - # try more aggressively to fill the cache at startup - var attempts = 50 - while wpx.enrCache.len < MaxPeersCacheSize and attempts > 0: - attempts -= 1 - wpx.populateEnrCache() - await sleepAsync(1.seconds) - - heartbeat "Updating px enr cache", CacheRefreshInterval: - wpx.populateEnrCache() +proc getEnrsFromStore( + wpx: WakuPeerExchange, numPeers: uint64 +): seq[enr.Record] {.gcsafe.} = + # Reservoir sampling (Algorithm R) + var i = 0 + let k = min(MaxPeersCacheSize, numPeers.int) + let enrStoreLen = wpx.peerManager.switch.peerStore[ENRBook].len + var enrs = newSeqOfCap[enr.Record](min(k, enrStoreLen)) + wpx.peerManager.switch.peerStore.forEnrPeers( + peerId, peerConnectedness, peerOrigin, peerEnrRecord + ): + if peerConnectedness == CannotConnect: + debug "Could not retrieve ENR because cannot connect to peer", + remotePeerId = peerId + continue + poolFilter(wpx.cluster, peerOrigin, peerEnrRecord).isOkOr: + debug "Could not get ENR because no peer matched pool", error = error + continue + if i < k: + enrs.add(peerEnrRecord) + else: + # Add some randomness + let j = rand(i) + if j < k: + enrs[j] = peerEnrRecord + inc(i) + return enrs proc initProtocolHandler(wpx: WakuPeerExchange) = proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} = @@ -174,7 +160,8 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = error "Failed to respond with BAD_REQUEST:", error = $error return - let enrs = wpx.getEnrsFromCache(decBuf.request.numPeers) + let enrs = wpx.getEnrsFromStore(decBuf.request.numPeers) + info "peer exchange request received" trace "px enrs to respond", enrs = $enrs try: @@ -214,5 +201,4 @@ proc new*( ) wpx.initProtocolHandler() setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting) - asyncSpawn wpx.updatePxEnrCache() return wpx