Skip to content

Commit ccca9f5

Browse files
committed
fix: remove ENR cache from peer exchange (#3578)
* remove WakuPeerExchange.enrCache * add forEnrPeers to support fast PeerStore search * add getEnrsFromStore * fix peer exchange tests
1 parent c6cf34d commit ccca9f5

File tree

5 files changed

+96
-121
lines changed

5 files changed

+96
-121
lines changed

tests/node/test_wakunode_peer_exchange.nim

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,17 @@ suite "Waku Peer Exchange":
6666

6767
suite "fetchPeerExchangePeers":
6868
var node2 {.threadvar.}: WakuNode
69+
var node3 {.threadvar.}: WakuNode
6970

7071
asyncSetup:
7172
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
7273
node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
74+
node3 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
7375

74-
await allFutures(node.start(), node2.start())
76+
await allFutures(node.start(), node2.start(), node3.start())
7577

7678
asyncTeardown:
77-
await allFutures(node.stop(), node2.stop())
79+
await allFutures(node.stop(), node2.stop(), node3.stop())
7880

7981
asyncTest "Node fetches without mounting peer exchange":
8082
# When a node, without peer exchange mounted, fetches peers
@@ -104,12 +106,10 @@ suite "Waku Peer Exchange":
104106
await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()])
105107
check node.peerManager.switch.peerStore.peers.len == 0
106108

107-
# Mock that we discovered a node (to avoid running discv5)
108-
var enr = enr.Record()
109-
assert enr.fromUri(
110-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
111-
), "Failed to parse ENR"
112-
node2.wakuPeerExchange.enrCache.add(enr)
109+
# Simulate node2 discovering node3 via Discv5
110+
var rpInfo = node3.peerInfo.toRemotePeerInfo()
111+
rpInfo.enr = some(node3.enr)
112+
node2.peerManager.addPeer(rpInfo, PeerOrigin.Discv5)
113113

114114
# Set node2 as service peer (default one) for px protocol
115115
node.peerManager.addServicePeer(
@@ -121,10 +121,8 @@ suite "Waku Peer Exchange":
121121
check res.tryGet() == 1
122122

123123
# Check that the peer ended up in the peerstore
124-
let rpInfo = enr.toRemotePeerInfo.get()
125124
check:
126125
node.peerManager.switch.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
127-
node.peerManager.switch.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)
128126

129127
suite "setPeerExchangePeer":
130128
var node2 {.threadvar.}: WakuNode

tests/waku_peer_exchange/test_protocol.nim

Lines changed: 42 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,13 @@ suite "Waku Peer Exchange":
142142
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
143143
node2 =
144144
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
145+
node3 =
146+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
147+
node4 =
148+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
145149

146150
# Start and mount peer exchange
147-
await allFutures([node1.start(), node2.start()])
151+
await allFutures([node1.start(), node2.start(), node3.start(), node4.start()])
148152
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
149153

150154
# Create connection
@@ -154,18 +158,15 @@ suite "Waku Peer Exchange":
154158
require:
155159
connOpt.isSome
156160

157-
# Create some enr and add to peer exchange (simulating disv5)
158-
var enr1, enr2 = enr.Record()
159-
check enr1.fromUri(
160-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
161-
)
162-
check enr2.fromUri(
163-
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
164-
)
161+
# Simulate node1 discovering node3 via Discv5
162+
var info3 = node3.peerInfo.toRemotePeerInfo()
163+
info3.enr = some(node3.enr)
164+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
165165

166-
# Mock that we have discovered these enrs
167-
node1.wakuPeerExchange.enrCache.add(enr1)
168-
node1.wakuPeerExchange.enrCache.add(enr2)
166+
# Simulate node1 discovering node4 via Discv5
167+
var info4 = node4.peerInfo.toRemotePeerInfo()
168+
info4.enr = some(node4.enr)
169+
node1.peerManager.addPeer(info4, PeerOrigin.Discv5)
169170

170171
# Request 2 peer from px. Test all request variants
171172
let response1 = await node2.wakuPeerExchangeClient.request(2)
@@ -185,12 +186,12 @@ suite "Waku Peer Exchange":
185186
response3.get().peerInfos.len == 2
186187

187188
# Since it can return duplicates test that at least one of the enrs is in the response
188-
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or
189-
response1.get().peerInfos.anyIt(it.enr == enr2.raw)
190-
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or
191-
response2.get().peerInfos.anyIt(it.enr == enr2.raw)
192-
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or
193-
response3.get().peerInfos.anyIt(it.enr == enr2.raw)
189+
response1.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
190+
response1.get().peerInfos.anyIt(it.enr == node4.enr.raw)
191+
response2.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
192+
response2.get().peerInfos.anyIt(it.enr == node4.enr.raw)
193+
response3.get().peerInfos.anyIt(it.enr == node3.enr.raw) or
194+
response3.get().peerInfos.anyIt(it.enr == node4.enr.raw)
194195

195196
asyncTest "Request fails gracefully":
196197
let
@@ -275,9 +276,11 @@ suite "Waku Peer Exchange":
275276
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
276277
node2 =
277278
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
279+
node3 =
280+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
278281

279282
# Start and mount peer exchange
280-
await allFutures([node1.start(), node2.start()])
283+
await allFutures([node1.start(), node2.start(), node3.start()])
281284
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()])
282285

283286
# Connect the nodes
@@ -286,12 +289,10 @@ suite "Waku Peer Exchange":
286289
)
287290
assert dialResponse.isSome
288291

289-
# Mock that we have discovered one enr
290-
var record = enr.Record()
291-
check record.fromUri(
292-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
293-
)
294-
node1.wakuPeerExchange.enrCache.add(record)
292+
# Simulate node1 discovering node3 via Discv5
293+
var info3 = node3.peerInfo.toRemotePeerInfo()
294+
info3.enr = some(node3.enr)
295+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
295296

296297
# When requesting 0 peers
297298
let response = await node2.wakuPeerExchangeClient.request(0)
@@ -312,13 +313,6 @@ suite "Waku Peer Exchange":
312313
await allFutures([node1.start(), node2.start()])
313314
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()])
314315

315-
# Mock that we have discovered one enr
316-
var record = enr.Record()
317-
check record.fromUri(
318-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
319-
)
320-
node2.wakuPeerExchange.enrCache.add(record)
321-
322316
# When making any request with an invalid peer info
323317
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
324318
remotePeerInfo2.peerId.data.add(255.byte)
@@ -362,17 +356,17 @@ suite "Waku Peer Exchange":
362356
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
363357
node2 =
364358
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
359+
node3 =
360+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
365361

366362
# Start and mount peer exchange
367-
await allFutures([node1.start(), node2.start()])
363+
await allFutures([node1.start(), node2.start(), node3.start()])
368364
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
369365

370-
# Mock that we have discovered these enrs
371-
var enr1 = enr.Record()
372-
check enr1.fromUri(
373-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
374-
)
375-
node1.wakuPeerExchange.enrCache.add(enr1)
366+
# Simulate node1 discovering node3 via Discv5
367+
var info3 = node3.peerInfo.toRemotePeerInfo()
368+
info3.enr = some(node3.enr)
369+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
376370

377371
# Create connection
378372
let connOpt = await node2.peerManager.dialPeer(
@@ -396,44 +390,38 @@ suite "Waku Peer Exchange":
396390
check:
397391
decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
398392
decodedBuff.get().response.peerInfos.len == 1
399-
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
393+
decodedBuff.get().response.peerInfos[0].enr == node3.enr.raw
400394

401395
asyncTest "RateLimit as expected":
402396
let
403397
node1 =
404398
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
405399
node2 =
406400
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
401+
node3 =
402+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
407403

408404
# Start and mount peer exchange
409-
await allFutures([node1.start(), node2.start()])
405+
await allFutures([node1.start(), node2.start(), node3.start()])
410406
await allFutures(
411407
[
412408
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
413409
node2.mountPeerExchangeClient(),
414410
]
415411
)
416412

413+
# Simulate node1 discovering nodeA via Discv5
414+
var info3 = node3.peerInfo.toRemotePeerInfo()
415+
info3.enr = some(node3.enr)
416+
node1.peerManager.addPeer(info3, PeerOrigin.Discv5)
417+
417418
# Create connection
418419
let connOpt = await node2.peerManager.dialPeer(
419420
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
420421
)
421422
require:
422423
connOpt.isSome
423424

424-
# Create some enr and add to peer exchange (simulating disv5)
425-
var enr1, enr2 = enr.Record()
426-
check enr1.fromUri(
427-
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
428-
)
429-
check enr2.fromUri(
430-
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
431-
)
432-
433-
# Mock that we have discovered these enrs
434-
node1.wakuPeerExchange.enrCache.add(enr1)
435-
node1.wakuPeerExchange.enrCache.add(enr2)
436-
437425
await sleepAsync(150.milliseconds)
438426

439427
# Request 2 peer from px. Test all request variants

waku/node/peer_manager/waku_peer_store.nim

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,3 +227,14 @@ proc getPeersByCapability*(
227227
): seq[RemotePeerInfo] =
228228
return
229229
peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))
230+
231+
template forEnrPeers*(peerStore: PeerStore, body: untyped) =
232+
let enrBook = peerStore[ENRBook]
233+
let connBook = peerStore[ConnectionBook]
234+
let sourceBook = peerStore[SourceBook]
235+
for pid, enrRecord in tables.pairs(enrBook.book):
236+
let peerId {.inject.} = pid
237+
let peerConnectedness {.inject.} = connBook.book.getOrDefault(pid, NotConnected)
238+
let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin)
239+
let peerEnrRecord {.inject.} = enrRecord
240+
body

waku/node/waku_node.nim

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -525,9 +525,6 @@ proc stop*(node: WakuNode) {.async.} =
525525
if not node.wakuStoreTransfer.isNil():
526526
node.wakuStoreTransfer.stop()
527527

528-
if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
529-
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()
530-
531528
if not node.wakuPeerExchangeClient.isNil() and
532529
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
533530
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()

waku/waku_peer_exchange/protocol.nim

Lines changed: 35 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ export WakuPeerExchangeCodec
2222

2323
declarePublicGauge waku_px_peers_received_unknown,
2424
"number of previously unknown ENRs received via peer exchange"
25-
declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached"
2625
declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"]
2726
declarePublicCounter waku_px_peers_sent,
2827
"number of ENRs sent to peer exchange requesters"
@@ -32,11 +31,9 @@ logScope:
3231

3332
type WakuPeerExchange* = ref object of LPProtocol
3433
peerManager*: PeerManager
35-
enrCache*: seq[enr.Record]
3634
cluster*: Option[uint16]
3735
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
3836
requestRateLimiter*: RequestRateLimiter
39-
pxLoopHandle*: Future[void]
4037

4138
proc respond(
4239
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection
@@ -79,61 +76,45 @@ proc respondError(
7976

8077
return ok()
8178

82-
proc getEnrsFromCache(
83-
wpx: WakuPeerExchange, numPeers: uint64
84-
): seq[enr.Record] {.gcsafe.} =
85-
if wpx.enrCache.len() == 0:
86-
info "peer exchange ENR cache is empty"
87-
return @[]
88-
89-
# copy and shuffle
90-
randomize()
91-
var shuffledCache = wpx.enrCache
92-
shuffledCache.shuffle()
93-
94-
# return numPeers or less if cache is smaller
95-
return shuffledCache[0 ..< min(shuffledCache.len.int, numPeers.int)]
96-
97-
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
98-
if peer.origin != Discv5:
99-
trace "peer not from discv5", peer = $peer, origin = $peer.origin
79+
proc poolFilter*(
80+
cluster: Option[uint16], origin: PeerOrigin, enr: enr.Record, log: bool = false
81+
): bool =
82+
if origin != Discv5:
83+
if log:
84+
trace "peer not from discv5", origin = $origin
10085
return false
86+
if cluster.isSome() and enr.isClusterMismatched(cluster.get()):
87+
if log:
88+
info "peer has mismatching cluster"
89+
return false
90+
return true
10191

92+
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
10293
if peer.enr.isNone():
10394
info "peer has no ENR", peer = $peer
10495
return false
96+
return poolFilter(cluster, peer.origin, peer.enr.get(), log = true)
10597

106-
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
107-
info "peer has mismatching cluster", peer = $peer
108-
return false
109-
110-
return true
111-
112-
proc populateEnrCache(wpx: WakuPeerExchange) =
113-
# share only peers that i) are reachable ii) come from discv5 iii) share cluster
114-
let withEnr = wpx.peerManager.switch.peerStore.getReachablePeers().filterIt(
115-
poolFilter(wpx.cluster, it)
116-
)
117-
118-
# either what we have or max cache size
119-
var newEnrCache = newSeq[enr.Record](0)
120-
for i in 0 ..< min(withEnr.len, MaxPeersCacheSize):
121-
newEnrCache.add(withEnr[i].enr.get())
122-
123-
# swap cache for new
124-
wpx.enrCache = newEnrCache
125-
trace "ENR cache populated"
126-
127-
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
128-
# try more aggressively to fill the cache at startup
129-
var attempts = 50
130-
while wpx.enrCache.len < MaxPeersCacheSize and attempts > 0:
131-
attempts -= 1
132-
wpx.populateEnrCache()
133-
await sleepAsync(1.seconds)
134-
135-
heartbeat "Updating px enr cache", CacheRefreshInterval:
136-
wpx.populateEnrCache()
98+
proc getEnrsFromStore(
99+
wpx: WakuPeerExchange, numPeers: uint64
100+
): seq[enr.Record] {.gcsafe.} =
101+
# Reservoir sampling (Algorithm R)
102+
var i = 0
103+
let k = if numPeers > int.high.uint64: int.high else: numPeers.int
104+
var enrs = newSeqOfCap[enr.Record](k)
105+
wpx.peerManager.switch.peerStore.forEnrPeers:
106+
if peerConnectedness == CannotConnect:
107+
continue
108+
if not poolFilter(wpx.cluster, peerOrigin, peerEnrRecord):
109+
continue
110+
if i < k:
111+
enrs.add(peerEnrRecord)
112+
else:
113+
let j = rand(i)
114+
if j < k:
115+
enrs[j] = peerEnrRecord
116+
inc(i)
117+
return enrs
137118

138119
proc initProtocolHandler(wpx: WakuPeerExchange) =
139120
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
@@ -174,7 +155,8 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
174155
error "Failed to respond with BAD_REQUEST:", error = $error
175156
return
176157

177-
let enrs = wpx.getEnrsFromCache(decBuf.request.numPeers)
158+
let enrs = wpx.getEnrsFromStore(decBuf.request.numPeers)
159+
178160
info "peer exchange request received"
179161
trace "px enrs to respond", enrs = $enrs
180162
try:
@@ -214,5 +196,4 @@ proc new*(
214196
)
215197
wpx.initProtocolHandler()
216198
setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting)
217-
asyncSpawn wpx.updatePxEnrCache()
218199
return wpx

0 commit comments

Comments
 (0)