5555 DropConn * = proc (peer: PubSubPeer ) {.gcsafe , raises : [].} # have to pass peer as it's unknown during init
5656 OnEvent * = proc (peer: PubSubPeer , event: PubSubPeerEvent ) {.gcsafe , raises : [].}
5757
58- Ttlmessage * = object
58+ QueuedMessage * = object
5959 msg* : seq [byte ]
60- ttl * : Moment
60+ addedAt * : Moment
6161
6262 RpcMessageQueue * = ref object
6363 # Tracks async tasks for sending high-priority peer-published messages.
6464 sendPriorityQueue: Deque [Future [void ]]
6565 # Queue for lower-priority messages, like "IWANT" replies and relay messages.
66- nonPriorityQueue: AsyncQueue [seq [ byte ] ]
66+ nonPriorityQueue: AsyncQueue [QueuedMessage ]
6767 # Task for processing non-priority message queue.
6868 sendNonPriorityTask: Future [void ]
6969 # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
@@ -300,7 +300,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
300300 when defined (libp2p_expensive_metrics):
301301 libp2p_gossipsub_priority_queue_size.inc (labelValues = [$ p.peerId])
302302 else :
303- await p.rpcmessagequeue.nonPriorityQueue.addLast (Ttlmessage (msg: msg, ttl : Moment .now ()))
303+ await p.rpcmessagequeue.nonPriorityQueue.addLast (QueuedMessage (msg: msg, addedAt : Moment .now ()))
304304 when defined (libp2p_expensive_metrics):
305305 libp2p_gossipsub_non_priority_queue_size.inc (labelValues = [$ p.peerId])
306306 trace " message queued" , p, msg = shortLog (msg)
@@ -388,7 +388,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
388388 let ttlMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst ()
389389 when defined (libp2p_expensive_metrics):
390390 libp2p_gossipsub_non_priority_queue_size.dec (labelValues = [$ p.peerId])
391- if Moment .now () - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
391+ if Moment .now () - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
392392 when defined (libp2p_expensive_metrics):
393393 libp2p_gossipsub_non_priority_msgs_dropped.inc (labelValues = [$ p.peerId])
394394 continue
@@ -413,7 +413,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
413413proc new (T: typedesc [RpcMessageQueue ], maxDurationInNonPriorityQueue = 1 .seconds): T =
414414 return T (
415415 sendPriorityQueue: initDeque [Future [void ]](),
416- nonPriorityQueue: newAsyncQueue [Ttlmessage ](),
416+ nonPriorityQueue: newAsyncQueue [QueuedMessage ](),
417417 maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
418418 )
419419
0 commit comments