99
1010{.used .}
1111
12- import std/ [options, deques, sequtils, enumerate, algorithm]
12+ import std/ [options, deques, sequtils, enumerate, algorithm, os ]
1313import stew/ byteutils
1414import ../../ libp2p/ builders
1515import ../../ libp2p/ errors
@@ -718,15 +718,19 @@ suite "GossipSub internal":
718718 await allFuturesThrowing (conns.mapIt (it.close ()))
719719 await gossipSub.switch.stop ()
720720
721- proc setupTest (): Future [tuple [gossip0: GossipSub , gossip1: GossipSub , receivedMessages: ref HashSet [seq [byte ]]]] {.async .} =
721+ proc setupTest (maxDurationInNonPriorityQueue1: Opt [Duration ] = Opt .none (Duration )):
722+ Future [tuple [gossip0: GossipSub , gossip1: GossipSub , receivedMessages: ref HashSet [seq [byte ]]]] {.async .} =
722723 let
723724 nodes = generateNodes (2 , gossip = true , verifySignature = false )
724725 discard await allFinished (
725726 nodes[0 ].switch.start (),
726727 nodes[1 ].switch.start ()
727728 )
729+ var gossip0: GossipSub = GossipSub (nodes[0 ])
730+ var gossip1: GossipSub = GossipSub (nodes[1 ])
728731
729- await nodes[1 ].switch.connect (nodes[0 ].switch.peerInfo.peerId, nodes[0 ].switch.peerInfo.addrs)
732+ gossip1.parameters.maxDurationInNonPriorityQueue = maxDurationInNonPriorityQueue1
733+ await gossip1.switch.connect (gossip0.switch.peerInfo.peerId, gossip0.switch.peerInfo.addrs)
730734
731735 var receivedMessages = new (HashSet [seq [byte ]])
732736
@@ -736,12 +740,10 @@ suite "GossipSub internal":
736740 proc handlerB (topic: string , data: seq [byte ]) {.async .} =
737741 discard
738742
739- nodes[ 0 ] .subscribe (" foobar" , handlerA)
740- nodes[ 1 ] .subscribe (" foobar" , handlerB)
743+ gossip0 .subscribe (" foobar" , handlerA)
744+ gossip1 .subscribe (" foobar" , handlerB)
741745 await waitSubGraph (nodes, " foobar" )
742746
743- var gossip0: GossipSub = GossipSub (nodes[0 ])
744- var gossip1: GossipSub = GossipSub (nodes[1 ])
745747
746748 return (gossip0, gossip1, receivedMessages)
747749
@@ -844,3 +846,18 @@ suite "GossipSub internal":
844846 check receivedMessages[].len == 1
845847
846848 await teardownTest (gossip0, gossip1)
849+
850+ asyncTest " e2e - drop msg if it is in the non-priority queue for too long" :
851+ # This test checks if two messages, both below the maxSize, are correctly processed and sent.
852+ # Expected: Both messages should be received.
853+ let maxDurationInNonPriorityQueueGossip1 = 100 .millis
854+ let (gossip0, gossip1, receivedMessages) = await setupTest (Opt .some (maxDurationInNonPriorityQueueGossip1))
855+
856+ let topic = " foobar"
857+ gossip1.broadcast (gossip1.mesh [topic], RPCMsg (messages: @ [Message (topicIDs: @ [topic], data: newSeq [byte ](35 ))]), false )
858+ sleep (100 ) # pause all tasks to ensure that the message stay in the non-priority queue longer than maxDurationInNonPriorityQueueGossip1
859+ gossip1.broadcast (gossip1.mesh [topic], RPCMsg (messages: @ [Message (topicIDs: @ [topic], data: newSeq [byte ](36 ))]), false )
860+ await sleepAsync (100 .milliseconds) # wait for the messages to be processed
861+ check: receivedMessages[].len == 1 # only the second message should be received
862+
863+ await teardownTest (gossip0, gossip1)
0 commit comments