Skip to content

Commit bafb2db

Browse files
committed
fix
1 parent 783a3cd commit bafb2db

File tree

6 files changed

+152
-50
lines changed

6 files changed

+152
-50
lines changed

tests/libp2p/pubsub/component/test_gossipsub_heartbeat.nim

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -147,35 +147,36 @@ suite "GossipSub Component - Heartbeat":
147147
expectedGrafts &= peer
148148

149149
# Then during heartbeat Peers with lower than median scores are pruned and max 2 Peers are grafted
150-
await waitForHeartbeat(heartbeatInterval)
151150

152-
let actualGrafts = node0.mesh[topic].toSeq().filterIt(it notin startingMesh)
153-
check:
154-
actualGrafts.len == MaxOpportunisticGraftPeers
155-
actualGrafts.allIt(it in expectedGrafts)
151+
waitUntilTimeout:
152+
pre:
153+
let actualGrafts = node0.mesh[topic].toSeq().filterIt(it notin startingMesh)
154+
check:
155+
actualGrafts.len == MaxOpportunisticGraftPeers
156+
actualGrafts.allIt(it in expectedGrafts)
156157

157158
asyncTest "Fanout maintenance during heartbeat - expired peers are dropped":
158159
const
159160
numberOfNodes = 10
160161
topic = "foobar"
161162
heartbeatInterval = 200.milliseconds
162-
let nodes = generateNodes(
163-
numberOfNodes,
164-
gossip = true,
165-
fanoutTTL = 60.milliseconds,
166-
heartbeatInterval = heartbeatInterval,
167-
)
168-
.toGossipSub()
163+
let
164+
nodes = generateNodes(
165+
numberOfNodes,
166+
gossip = true,
167+
fanoutTTL = 60.milliseconds,
168+
heartbeatInterval = heartbeatInterval,
169+
)
170+
.toGossipSub()
171+
node0 = nodes[0]
169172

170173
startNodesAndDeferStop(nodes)
171174
await connectNodesStar(nodes)
172175

173176
# All nodes but Node0 are subscribed to the topic
174-
for node in nodes[1 .. ^1]:
175-
node.subscribe(topic, voidTopicHandler)
177+
subscribeAllNodes(nodes[1 .. ^1], topic, voidTopicHandler)
176178
await waitForHeartbeat(heartbeatInterval)
177179

178-
let node0 = nodes[0]
179180
checkUntilTimeout:
180181
node0.gossipsub.hasKey(topic)
181182

@@ -207,8 +208,7 @@ suite "GossipSub Component - Heartbeat":
207208
await connectNodesStar(nodes)
208209

209210
# All nodes but Node0 are subscribed to the topic
210-
for node in nodes[1 .. ^1]:
211-
node.subscribe(topic, voidTopicHandler)
211+
subscribeAllNodes(nodes[1 .. ^1], topic, voidTopicHandler)
212212
await waitForHeartbeat(heartbeatInterval)
213213

214214
# When Node0 sends a message to the topic
@@ -225,10 +225,12 @@ suite "GossipSub Component - Heartbeat":
225225

226226
# Then Node0 fanout peers are replenished during heartbeat
227227
# expecting 10[numberOfNodes] - 1[Node0] - (6[maxFanoutPeers] - 1[first peer not disconnected]) = 4
228-
let expectedLen = numberOfNodes - 1 - (maxFanoutPeers - 1)
229-
checkUntilTimeout:
230-
node0.fanout[topic].len == expectedLen
231-
node0.fanout[topic].toSeq().allIt(it.peerId notin peersToDisconnect)
228+
waitUntilTimeout:
229+
pre:
230+
let expectedLen = numberOfNodes - 1 - (maxFanoutPeers - 1)
231+
check:
232+
node0.fanout[topic].len == expectedLen
233+
node0.fanout[topic].toSeq().allIt(it.peerId notin peersToDisconnect)
232234

233235
asyncTest "iDontWants history - last element is pruned during heartbeat":
234236
const

tests/libp2p/pubsub/component/test_gossipsub_mesh_management.nim

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import chronos, std/[sequtils]
1313
import ../../../../libp2p/protocols/pubsub/[gossipsub, mcache, peertable, pubsubpeer]
14-
import ../../../tools/[unittest]
14+
import ../../../tools/[unittest, futures]
1515
import ../utils
1616

1717
suite "GossipSub Component - Mesh Management":
@@ -176,21 +176,24 @@ suite "GossipSub Component - Mesh Management":
176176
subscribeAllNodes(nodes, topic, voidTopicHandler)
177177

178178
# Then mesh and gossipsub should be populated
179-
for node in nodes:
180-
check node.topics.contains(topic)
181-
check node.gossipsub.hasKey(topic)
182-
check node.gossipsub[topic].len() == numberOfNodes - 1
183-
check node.mesh.hasKey(topic)
184-
check node.mesh[topic].len() == numberOfNodes - 1
179+
for n in nodes:
180+
let node = n
181+
checkUntilTimeout:
182+
node.topics.contains(topic)
183+
node.gossipsub.hasKey(topic)
184+
node.gossipsub[topic].len() == numberOfNodes - 1
185+
node.mesh.hasKey(topic)
186+
node.mesh[topic].len() == numberOfNodes - 1
185187

186188
# When all nodes unsubscribe from the topic
187189
unsubscribeAllNodes(nodes, topic, voidTopicHandler)
188190

189191
# Then the topic should be removed from mesh and gossipsub
190192
for node in nodes:
191-
check topic notin node.topics
192-
check topic notin node.mesh
193-
check topic notin node.gossipsub
193+
check:
194+
topic notin node.topics
195+
topic notin node.mesh
196+
topic notin node.gossipsub
194197

195198
asyncTest "handle subscribe and unsubscribe for multiple topics":
196199
let
@@ -333,6 +336,11 @@ suite "GossipSub Component - Mesh Management":
333336
await connectNodes(nodes[0], nodes[2]) # Out
334337
await connectNodes(nodes[3], nodes[0]) # In
335338
subscribeAllNodes(nodes, topic, voidTopicHandler, wait = false)
339+
await allFuturesThrowing(
340+
waitSub(nodes[0], nodes[1], topic),
341+
waitSub(nodes[0], nodes[2], topic),
342+
waitSub(nodes[3], nodes[0], topic),
343+
)
336344

337345
checkUntilTimeout:
338346
nodes[0].mesh.outboundPeers(topic) == 2

tests/libp2p/pubsub/component/test_gossipsub_message_cache.nim

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import chronos, std/[sequtils], stew/byteutils
1313
import
1414
../../../../libp2p/protocols/pubsub/
1515
[gossipsub, mcache, peertable, floodsub, rpc/messages, rpc/message]
16-
import ../../../tools/[unittest]
16+
import ../../../tools/[unittest, futures]
1717
import ../utils
1818

1919
suite "GossipSub Component - Message Cache":
@@ -222,7 +222,10 @@ suite "GossipSub Component - Message Cache":
222222
await connectNodes(nodes[0], nodes[1])
223223
nodes[0].subscribe(topic, voidTopicHandler)
224224
nodes[1].subscribe(topic, voidTopicHandler)
225-
await waitSub(nodes[0], nodes[1], topic)
225+
await allFuturesThrowing(
226+
waitSub(nodes[0], nodes[1], topic),
227+
waitSub(nodes[1], nodes[0], topic),
228+
)
226229

227230
# When Node0 publishes two messages to the topic
228231
tryPublish await nodes[0].publish(topic, "Hello".toBytes()), 1
@@ -246,7 +249,6 @@ suite "GossipSub Component - Message Cache":
246249
await waitSub(nodes[0], nodes[2], topic)
247250

248251
# And messageIds are added to node0PeerNode2 sentIHaves to allow processing IWant
249-
# let node0PeerNode2 =
250252
let node0PeerNode2 = nodes[0].getPeerByPeerId(topic, nodes[2].peerInfo.peerId)
251253
node0PeerNode2.sentIHaves[0].incl(messageId1)
252254
node0PeerNode2.sentIHaves[0].incl(messageId2)
@@ -263,8 +265,6 @@ suite "GossipSub Component - Message Cache":
263265
@[node2PeerNode0], RPCMsg(control: some(iWantMessage)), isHighPriority = false
264266
)
265267

266-
await waitForHeartbeat()
267-
268268
# Then Node2 receives only messageId2 and messageId1 is dropped
269269
checkUntilTimeout:
270270
nodes[2].mcache.window(topic).toSeq() == @[messageId2]

tests/libp2p/pubsub/component/test_gossipsub_scoring.nim

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ suite "GossipSub Component - Scoring":
3939
# Nodes are subscribed to the same topic
4040
nodes[1].subscribe(topic, handler1)
4141
nodes[2].subscribe(topic, handler2)
42-
await waitForHeartbeat()
42+
await waitSub(nodes[0], nodes[1], topic)
43+
await waitSub(nodes[0], nodes[2], topic)
4344

4445
# Given node 2's score is below the threshold
4546
for peer in g0.gossipsub.getOrDefault(topic):
@@ -245,7 +246,7 @@ suite "GossipSub Component - Scoring":
245246
var (handlerFut, handler) = createCompleteHandler()
246247
nodes[0].subscribe(topic, voidTopicHandler)
247248
nodes[1].subscribe(topic, handler)
248-
await waitForHeartbeat()
249+
await waitSub(nodes[0], nodes[1], topic)
249250

250251
nodes[1].updateScores()
251252

@@ -345,7 +346,7 @@ suite "GossipSub Component - Scoring":
345346
var (handlerFut, handler) = createCompleteHandler()
346347
nodes[0].subscribe(topic, voidTopicHandler)
347348
nodes[1].subscribe(topic, handler)
348-
await waitForHeartbeat()
349+
await waitSub(nodes[0], nodes[1], topic)
349350

350351
tryPublish await nodes[0].publish(topic, toBytes("hello")), 1
351352

tests/tools/test_unittest.nim

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,36 @@ suite "checkUntilTimeout helpers - failed":
8484
asyncTest "checkUntilTimeoutCustom should timeout if condition is never true":
8585
checkUntilTimeoutCustom(100.milliseconds, 10.milliseconds):
8686
false
87+
88+
suite "waitUntilTimeout helpers":
89+
asyncTest "waitUntilTimeout should pass after few attempts":
90+
let a = 2
91+
var b = 0
92+
93+
waitUntilTimeout:
94+
pre:
95+
b.inc
96+
check:
97+
a == b
98+
99+
check:
100+
a == b
101+
102+
asyncTest "waitUntilTimeout should pass after few attempts: multi condition":
103+
let goal1 = 2
104+
let goal2 = 4
105+
var val1 = 0
106+
var val2 = 0
107+
108+
waitUntilTimeout:
109+
pre:
110+
val1.inc
111+
val2.inc
112+
val2.inc
113+
check:
114+
val1 == goal1
115+
val2 == goal2
116+
117+
check:
118+
val1 == goal1
119+
val2 == goal2

tests/tools/unittest.nim

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,74 @@ template asyncTest*(name: string, body: untyped): untyped =
5050
)()
5151
)
5252

53+
proc buildAndExpr(n: NimNode): NimNode =
54+
# Helper proc to recursively build a combined boolean expression
55+
56+
if n.kind == nnkStmtList and n.len > 0:
57+
var combinedExpr = n[0] # Start with the first expression
58+
for i in 1 ..< n.len:
59+
# Combine the current expression with the next using 'and'
60+
combinedExpr = newCall("and", combinedExpr, n[i])
61+
return combinedExpr
62+
else:
63+
return n
64+
65+
const
66+
timeoutDefault: Duration = 30.seconds
67+
sleepIntervalDefault: Duration = 50.milliseconds
68+
69+
macro waitUntilTimeout*(args: untyped): untyped =
70+
## Periodically checks a given condition until it is true or a timeout occurs.
71+
##
72+
## `pre`: untyped - Any logic that needs to be updated before calling `check`.
73+
## `check`: untyped - A condition expression that should eventually evaluate to true.
74+
##
75+
## Examples:
76+
## ```nim
77+
## # Example 1:
78+
## waitUntilTimeout:
79+
## pre:
80+
## let value = getLatestValue()
81+
## check:
82+
## value == 3
83+
if args.kind != nnkStmtList:
84+
error "waitUntilTimeout requires a block with check: and pre:"
85+
86+
var checkBlock: NimNode = nil
87+
var preconditionBlock: NimNode = nil
88+
89+
for stmt in args:
90+
if stmt.kind == nnkCall and $stmt[0] == "check":
91+
checkBlock = stmt[1]
92+
elif stmt.kind == nnkCall and $stmt[0] == "pre":
93+
preconditionBlock = stmt[1]
94+
95+
if checkBlock.isNil or preconditionBlock.isNil:
96+
error "waitUntilTimeout block must contain both `check:` and `pre:` sections."
97+
98+
let combinedBoolExpr = buildAndExpr(checkBlock)
99+
100+
result = quote:
101+
proc checkExpiringInternal(): Future[void] {.gensym, async.} =
102+
let start = Moment.now()
103+
while true:
104+
if Moment.now() > (start + `timeoutDefault`):
105+
checkpoint(
106+
"[TIMEOUT] Timeout was reached and the conditions were not true. Check if the code is working as " &
107+
"expected or consider increasing the timeout param."
108+
)
109+
`preconditionBlock`
110+
check `checkBlock`
111+
return
112+
else:
113+
`preconditionBlock`
114+
if `combinedBoolExpr`:
115+
return
116+
else:
117+
await sleepAsync(`sleepIntervalDefault`)
118+
119+
await checkExpiringInternal()
120+
53121
macro checkUntilTimeoutCustom*(
54122
timeout: Duration, sleepInterval: Duration, code: untyped
55123
): untyped =
@@ -76,16 +144,6 @@ macro checkUntilTimeoutCustom*(
76144
## a == 2
77145
## b == 1
78146
## ```
79-
# Helper proc to recursively build a combined boolean expression
80-
proc buildAndExpr(n: NimNode): NimNode =
81-
if n.kind == nnkStmtList and n.len > 0:
82-
var combinedExpr = n[0] # Start with the first expression
83-
for i in 1 ..< n.len:
84-
# Combine the current expression with the next using 'and'
85-
combinedExpr = newCall("and", combinedExpr, n[i])
86-
return combinedExpr
87-
else:
88-
return n
89147

90148
# Build the combined expression
91149
let combinedBoolExpr = buildAndExpr(code)
@@ -131,7 +189,7 @@ macro checkUntilTimeout*(code: untyped): untyped =
131189
## b == 1
132190
## ```
133191
result = quote:
134-
checkUntilTimeoutCustom(30.seconds, 50.milliseconds, `code`)
192+
checkUntilTimeoutCustom(timeoutDefault, sleepIntervalDefault, `code`)
135193

136194
template finalCheckTrackers*(): untyped =
137195
# finalCheckTrackers is a utility used for performing a final tracker check

0 commit comments

Comments
 (0)