Skip to content

Commit 4664b96

Browse files
fcecindarshankabariya
authored andcommitted
fix: remove ENR cache from peer exchange (#3652)
* remove WakuPeerExchange.enrCache * add forEnrPeers to support fast PeerStore search * add getEnrsFromStore * fix peer exchange tests
1 parent ff93643 commit 4664b96

File tree

5 files changed

+108
-125
lines changed

5 files changed

+108
-125
lines changed

tests/node/test_wakunode_peer_exchange.nim

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,17 @@ suite "Waku Peer Exchange":
6666

6767
suite "fetchPeerExchangePeers":
6868
var node2 {.threadvar.}: WakuNode
69+
var node3 {.threadvar.}: WakuNode
6970

7071
asyncSetup:
7172
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
7273
node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
74+
node3 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
7375

74-
await allFutures(node.start(), node2.start())
76+
await allFutures(node.start(), node2.start(), node3.start())
7577

7678
asyncTeardown:
77-
await allFutures(node.stop(), node2.stop())
79+
await allFutures(node.stop(), node2.stop(), node3.stop())
7880

7981
asyncTest "Node fetches without mounting peer exchange":
8082
# When a node, without peer exchange mounted, fetches peers
@@ -104,12 +106,10 @@ suite "Waku Peer Exchange":
104106
await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()])
105107
check node.peerManager.switch.peerStore.peers.len == 0
106108

107-
# Mock that we discovered a node (to avoid running discv5)
108-
var enr = enr.Record()
109-
assert enr.fromUri(
110-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
111-
), "Failed to parse ENR"
112-
node2.wakuPeerExchange.enrCache.add(enr)
109+
# Simulate node2 discovering node3 via Discv5
110+
var rpInfo = node3.peerInfo.toRemotePeerInfo()
111+
rpInfo.enr = some(node3.enr)
112+
node2.peerManager.addPeer(rpInfo, PeerOrigin.Discv5)
113113

114114
# Set node2 as service peer (default one) for px protocol
115115
node.peerManager.addServicePeer(
@@ -121,10 +121,8 @@ suite "Waku Peer Exchange":
121121
check res.tryGet() == 1
122122

123123
# Check that the peer ended up in the peerstore
124-
let rpInfo = enr.toRemotePeerInfo.get()
125124
check:
126125
node.peerManager.switch.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
127-
node.peerManager.switch.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)
128126

129127
suite "setPeerExchangePeer":
130128
var node2 {.threadvar.}: WakuNode

tests/waku_peer_exchange/test_protocol.nim

Lines changed: 44 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,13 @@ suite "Waku Peer Exchange":
142142
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
143143
node2 =
144144
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
145+
node3 =
146+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
147+
node4 =
148+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
145149

146150
# Start and mount peer exchange
147-
await allFutures([node1.start(), node2.start()])
151+
await allFutures([node1.start(), node2.start(), node3.start(), node4.start()])
148152
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
149153

150154
# Create connection
@@ -154,18 +158,15 @@ suite "Waku Peer Exchange":
154158
require:
155159
connOpt.isSome
156160

157-
# Create some enr and add to peer exchange (simulating disv5)
158-
var enr1, enr2 = enr.Record()
159-
check enr1.fromUri(
160-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
161-
)
162-
check enr2.fromUri(
163-
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
164-
)
161+
# Simulate node1 discovering node3 via Discv5
162+
var info3 = node3.peerInfo.toRemotePeerInfo()
163+
info3.enr = some(node3.enr)
164+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
165165

166-
# Mock that we have discovered these enrs
167-
node1.wakuPeerExchange.enrCache.add(enr1)
168-
node1.wakuPeerExchange.enrCache.add(enr2)
166+
# Simulate node1 discovering node4 via Discv5
167+
var info4 = node4.peerInfo.toRemotePeerInfo()
168+
info4.enr = some(node4.enr)
169+
node1.peerManager.addPeer(info4, PeerOrigin.Discv5)
169170

170171
# Request 2 peer from px. Test all request variants
171172
let response1 = await node2.wakuPeerExchangeClient.request(2)
@@ -185,12 +186,12 @@ suite "Waku Peer Exchange":
185186
response3.get().peerInfos.len == 2
186187

187188
# Since it can return duplicates test that at least one of the enrs is in the response
188-
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or
189-
response1.get().peerInfos.anyIt(it.enr == enr2.raw)
190-
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or
191-
response2.get().peerInfos.anyIt(it.enr == enr2.raw)
192-
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or
193-
response3.get().peerInfos.anyIt(it.enr == enr2.raw)
189+
response1.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
190+
response1.get().peerInfos.anyIt(it.enr == node4.enr.raw)
191+
response2.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
192+
response2.get().peerInfos.anyIt(it.enr == node4.enr.raw)
193+
response3.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
194+
response3.get().peerInfos.anyIt(it.enr == node4.enr.raw)
194195

195196
asyncTest "Request fails gracefully":
196197
let
@@ -265,8 +266,8 @@ suite "Waku Peer Exchange":
265266
peerInfo2.origin = PeerOrigin.Discv5
266267

267268
check:
268-
not poolFilter(cluster, peerInfo1)
269-
poolFilter(cluster, peerInfo2)
269+
poolFilter(cluster, peerInfo1).isErr()
270+
poolFilter(cluster, peerInfo2).isOk()
270271

271272
asyncTest "Request 0 peers, with 1 peer in PeerExchange":
272273
# Given two valid nodes with PeerExchange
@@ -275,9 +276,11 @@ suite "Waku Peer Exchange":
275276
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
276277
node2 =
277278
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
279+
node3 =
280+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
278281

279282
# Start and mount peer exchange
280-
await allFutures([node1.start(), node2.start()])
283+
await allFutures([node1.start(), node2.start(), node3.start()])
281284
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
282285

283286
# Connect the nodes
@@ -286,12 +289,10 @@ suite "Waku Peer Exchange":
286289
)
287290
assert dialResponse.isSome
288291

289-
# Mock that we have discovered one enr
290-
var record = enr.Record()
291-
check record.fromUri(
292-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
293-
)
294-
node1.wakuPeerExchange.enrCache.add(record)
292+
# Simulate node1 discovering node3 via Discv5
293+
var info3 = node3.peerInfo.toRemotePeerInfo()
294+
info3.enr = some(node3.enr)
295+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
295296

296297
# When requesting 0 peers
297298
let response = await node2.wakuPeerExchangeClient.request(0)
@@ -312,13 +313,6 @@ suite "Waku Peer Exchange":
312313
await allFutures([node1.start(), node2.start()])
313314
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()])
314315

315-
# Mock that we have discovered one enr
316-
var record = enr.Record()
317-
check record.fromUri(
318-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
319-
)
320-
node2.wakuPeerExchange.enrCache.add(record)
321-
322316
# When making any request with an invalid peer info
323317
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
324318
remotePeerInfo2.peerId.data.add(255.byte)
@@ -362,17 +356,17 @@ suite "Waku Peer Exchange":
362356
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
363357
node2 =
364358
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
359+
node3 =
360+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
365361

366362
# Start and mount peer exchange
367-
await allFutures([node1.start(), node2.start()])
363+
await allFutures([node1.start(), node2.start(), node3.start()])
368364
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
369365

370-
# Mock that we have discovered these enrs
371-
var enr1 = enr.Record()
372-
check enr1.fromUri(
373-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
374-
)
375-
node1.wakuPeerExchange.enrCache.add(enr1)
366+
# Simulate node1 discovering node3 via Discv5
367+
var info3 = node3.peerInfo.toRemotePeerInfo()
368+
info3.enr = some(node3.enr)
369+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
376370

377371
# Create connection
378372
let connOpt = await node2.peerManager.dialPeer(
@@ -396,44 +390,38 @@ suite "Waku Peer Exchange":
396390
check:
397391
decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
398392
decodedBuff.get().response.peerInfos.len == 1
399-
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
393+
decodedBuff.get().response.peerInfos[0].enr == node3.enr.raw
400394

401395
asyncTest "RateLimit as expected":
402396
let
403397
node1 =
404398
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
405399
node2 =
406400
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
401+
node3 =
402+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
407403

408404
# Start and mount peer exchange
409-
await allFutures([node1.start(), node2.start()])
405+
await allFutures([node1.start(), node2.start(), node3.start()])
410406
await allFutures(
411407
[
412408
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
413409
node2.mountPeerExchangeClient(),
414410
]
415411
)
416412

413+
# Simulate node1 discovering nodeA via Discv5
414+
var info3 = node3.peerInfo.toRemotePeerInfo()
415+
info3.enr = some(node3.enr)
416+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
417+
417418
# Create connection
418419
let connOpt = await node2.peerManager.dialPeer(
419420
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
420421
)
421422
require:
422423
connOpt.isSome
423424

424-
# Create some enr and add to peer exchange (simulating disv5)
425-
var enr1, enr2 = enr.Record()
426-
check enr1.fromUri(
427-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
428-
)
429-
check enr2.fromUri(
430-
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
431-
)
432-
433-
# Mock that we have discovered these enrs
434-
node1.wakuPeerExchange.enrCache.add(enr1)
435-
node1.wakuPeerExchange.enrCache.add(enr2)
436-
437425
await sleepAsync(150.milliseconds)
438426

439427
# Request 2 peer from px. Test all request variants

waku/node/peer_manager/waku_peer_store.nim

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,17 @@ proc getPeersByCapability*(
202202
): seq[RemotePeerInfo] =
203203
return
204204
peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))
205+
206+
template forEnrPeers*(
207+
peerStore: PeerStore,
208+
peerId, peerConnectedness, peerOrigin, peerEnrRecord, body: untyped,
209+
) =
210+
let enrBook = peerStore[ENRBook]
211+
let connBook = peerStore[ConnectionBook]
212+
let sourceBook = peerStore[SourceBook]
213+
for pid, enrRecord in tables.pairs(enrBook.book):
214+
let peerId {.inject.} = pid
215+
let peerConnectedness {.inject.} = connBook.book.getOrDefault(pid, NotConnected)
216+
let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin)
217+
let peerEnrRecord {.inject.} = enrRecord
218+
body

waku/node/waku_node.nim

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,9 +489,6 @@ proc stop*(node: WakuNode) {.async.} =
489489
if not node.wakuStoreTransfer.isNil():
490490
node.wakuStoreTransfer.stop()
491491

492-
if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
493-
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()
494-
495492
if not node.wakuPeerExchangeClient.isNil() and
496493
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
497494
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()

0 commit comments

Comments
 (0)