Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 125 additions & 16 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import
./rpc/[messages, message, protobuf],
../protocol,
../../stream/connection,
../../utils/semaphore,
../../peerinfo,
../../peerid,
../../utility,
Expand All @@ -36,6 +37,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat

export types, scoring, behavior, pubsub

import std/atomics
const WARMUP_THRESHOLD = 2
var
lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation
lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant
lma_duplicate_count: Atomic[uint32] # number of duplicate messages received
lma_iwants_sent: Atomic[uint32] # number of iwant requests sent
lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied
lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message
lma_unique_receives: Atomic[uint32] # number of unique messages received
lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request
lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD

lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)
lma_warmup_messages.store(0)

export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent,
lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant

logScope:
topics = "libp2p gossipsub"

Expand Down Expand Up @@ -226,6 +253,7 @@ method init*(g: GossipSub) =
g.codecs &= GossipSubCodec_12
g.codecs &= GossipSubCodec_11
g.codecs &= GossipSubCodec_10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this change result in a GossipSubCodec_14 as described in libp2p/specs#654 ?

g.iwantsRequested = initHashSet[MessageId]()

method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
Expand Down Expand Up @@ -347,11 +375,13 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =

var respControl: ControlMessage
g.handleIDontWant(peer, control.idontwant)
g.handlePreamble(peer, control.preamble)
g.handleIMReceiving(peer, control.imreceiving)
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
let (messages, msgIDs) = g.handleIWant(peer, control.iwant)

let
isPruneNotEmpty = respControl.prune.len > 0
Expand All @@ -360,6 +390,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if isPruneNotEmpty or isIWantNotEmpty:
if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
lma_iwants_sent.atomicInc(respControl.iwant.len.uint32)

if isPruneNotEmpty:
for prune in respControl.prune:
Expand All @@ -376,11 +407,17 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
let topic = smsg.topic
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
#We send preamble first, so the peers sends IMReceiving to mesh members
g.broadcast(@[peer], RPCMsg(control: some(ControlMessage(
preamble: @[ControlIHave(topicID: topic, messageIDs: msgIDs)]
))), isHighPriority = true)
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])


# iwant replies have lower priority
trace "sending iwant reply messages", peer
lma_iwants_replied.atomicInc(messages.len.uint32)
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)

proc validateAndRelay(
Expand All @@ -397,6 +434,8 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers):
toSendPeers.incl(peers[])
if not (peer in toSendPeers):
lma_mesh_recvs_aftar_iwant.atomicInc()
toSendPeers.excl(peer)

if msg.data.len > max(512, msgId.len * 10):
Expand All @@ -409,25 +448,41 @@ proc validateAndRelay(
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)

#We dont consider first WARMUP_THRESHOLD messages in stats (They are for raising Cwnd)
if lma_warmup_messages.load() < WARMUP_THRESHOLD:
lma_warmup_messages.atomicInc()
if lma_warmup_messages.load() == WARMUP_THRESHOLD:
lma_dup_during_validation.store(0)
lma_idontwant_saves.store(0)
lma_duplicate_count.store(0)
lma_iwants_sent.store(0)
lma_iwants_replied.store(0)
lma_imreceiving_saves.store(0)
lma_unique_receives.store(0)
lma_mesh_recvs_aftar_iwant.store(0)

else:
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)

let validation = await g.validate(msg)

var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(saltedId, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
lma_dup_during_validation.atomicInc(seenPeers.len.uint32)
libp2p_gossipsub_saved_bytes.inc(
(msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"]
)
Expand Down Expand Up @@ -463,23 +518,57 @@ proc validateAndRelay(
# Don't send it to peers that sent it during validation
toSendPeers.excl(seenPeers)

#We have received IMReceiving from these peers, We should not exclude them
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
var receivingPeers: HashSet[PubSubPeer]
for pr in toSendPeers:
for heIsReceiving in pr.heIsReceivings:
if msgId in heIsReceiving:
receivingPeers.incl(pr)
break
toSendPeers.excl(receivingPeers)
lma_imreceiving_saves.atomicInc(receivingPeers.len.uint32)

proc isMsgInIdontWant(it: PubSubPeer): bool =
for iDontWant in it.iDontWants:
if saltedId in iDontWant:
libp2p_gossipsub_idontwant_saved_messages.inc
libp2p_gossipsub_saved_bytes.inc(
msg.data.len.int64, labelValues = ["idontwant"]
)
lma_idontwant_saves.atomicInc()
return true
return false

toSendPeers.exclIfIt(isMsgInIdontWant(it))

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
#g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer

let sem = newAsyncSemaphore(1)
var staggerPeers = toSeq(toSendPeers)
g.rng.shuffle(staggerPeers)

proc sendToOne(p: PubSubPeer) {.async.} =
g.broadcast(@[p], RPCMsg(control: some(ControlMessage(
preamble: @[ControlIHave(topicID: topic, messageIDs: @[msgId])]
))), isHighPriority = true)

#Won't add much delay as we populate messages in outgoing message queues (no timeouts needed)
#Small delay (nearing avg link latency) is sufficient for IMReceiving messages
await sem.acquire()
defer: sem.release()

if isMsgInIdontWant(p):
return
g.broadcast(@[p], RPCMsg(messages: @[msg]), isHighPriority = false)

for p in staggerPeers:
asyncSpawn sendToOne(p)


if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(
toSendPeers.len.int64, labelValues = [topic]
Expand Down Expand Up @@ -596,11 +685,13 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} =
g.rewardDelivered(peer, topic, false, delay)

libp2p_gossipsub_duplicate.inc()
lma_duplicate_count.atomicInc()

# onto the next message
continue

libp2p_gossipsub_received.inc()
lma_unique_receives.atomicInc()

# avoid processing messages we are not interested in
if topic notin g.topics:
Expand Down Expand Up @@ -782,7 +873,25 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy

g.mcache.put(msgId, msg)

g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
#g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✂️ ?

Suggested change
#g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)

let sem = newAsyncSemaphore(1)
var staggerPeers = toSeq(peers)
g.rng.shuffle(staggerPeers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the peers randomly sorted?


#We send message immediately after sending preamble to each peer
proc sendToOne(p: PubSubPeer) {.async.} =
g.broadcast(@[p], RPCMsg(control: some(ControlMessage(
preamble: @[ControlIHave(topicID: topic, messageIDs: @[msgId])]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending the message length is still pending, right?

))), isHighPriority = true)

await sem.acquire()
defer: sem.release()

g.broadcast(@[p], RPCMsg(messages: @[msg]), isHighPriority = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No isHighPriority = true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading in the specs, I see the following:

Adding a message preamble may increase control overhead for small messages.
Therefore, it is preferable to use it only for messages that exceed the preamble_threshold.

The way it is coded right now, it will send the preamble always, right? Is this because this PR is for experiments?


for p in staggerPeers:
asyncSpawn sendToOne(p)

Comment on lines +881 to +894
Copy link
Member

@richard-ramos richard-ramos Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the AsyncSemaphore(1) means that the preambles are being sent sequentially to each peer, right? Why is this the case? Shouldn't we try to get the preamble messages being sent concurrently to all peers?


if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
Expand Down
71 changes: 63 additions & 8 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,31 @@ proc handleIHave*(
for ihave in ihaves:
trace "peer sent ihave", peer, topicID = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.topics:
#look here for receieved idontwants for the same message
var meshPeers: HashSet[PubSubPeer]
g.mesh.withValue(ihave.topicID, peers): meshPeers.incl(peers[])
g.subscribedDirectPeers.withValue(ihave.topicID, peers): meshPeers.incl(peers[])

for msgId in ihave.messageIDs:
if not g.hasSeen(g.salt(msgId)):
if peer.iHaveBudget <= 0:
break
elif msgId in g.iwantsRequested:
break
elif msgId notin res.messageIDs:
res.messageIDs.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID = msgId
#dont send IWANT if we have received (N number of) IDontWant(s) for a msgID
let saltedID = g.salt(msgId)
var numFinds: int = 0
for meshPeer in meshPeers:
for heDontWant in meshPeer.iDontWants:
if saltedID in heDontWant:
numFinds = numFinds + 1
#break;
if numFinds == 0: #We currently wait for 1 IDontWants
res.messageIDs.add(msgId)
dec peer.iHaveBudget
g.iwantsRequested.incl(msgId)
trace "requested message via ihave", messageID = msgId
# shuffling res.messageIDs before sending it out to increase the likelihood
# of getting an answer if the peer truncates the list due to internal size restrictions.
g.rng.shuffle(res.messageIDs)
Expand All @@ -309,12 +326,49 @@ proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWa
if peer.iDontWants[^1].len > 1000:
break
peer.iDontWants[^1].incl(g.salt(messageId))

proc handlePreamble*(g: GossipSub, peer: PubSubPeer, preambles: seq[ControlIHave]) =
for preamble in preambles:
for messageId in preamble.messageIDs:
#Idealy a peer should a maximum of peer_preamble_announcements preambles for unfinished downloads
#A peer violating this should be pnalized through P4???
if peer.heIsSendings[^1].len > 1000:
break
peer.heIsSendings[^1].incl(messageId)
#Experimental change for quick performance evaluation only (Ideally for very large messages):
#[
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant

3) Better solution is to send Message detail in a message preamble, That can be used for IMReceiving
]#
var toSendPeers = HashSet[PubSubPeer]()
g.floodsub.withValue(preamble.topicID, peers): toSendPeers.incl(peers[])
g.mesh.withValue(preamble.topicID, peers): toSendPeers.incl(peers[])

# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(preamble.topicID))

g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
imreceiving: @[ControlIWant(messageIDs: @[messageId])]
))), isHighPriority = true)


proc handleIMReceiving*(g: GossipSub,
peer: PubSubPeer,
imreceivings: seq[ControlIWant]) =
for imreceiving in imreceivings:
for messageId in imreceiving.messageIDs:
if peer.heIsReceivings[^1].len > 1000: break
if messageId.len > 100: continue
Comment on lines +362 to +363
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do messageIds exceed 100? I'm wondering if this condition should be added in other places as i haven't seen length checking for messageIds using 100. (Probably a good idea to extract this into a constant too)

peer.heIsReceivings[^1].incl(messageId)

proc handleIWant*(
g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]
): seq[Message] =
): tuple[messages: seq[Message], ids: seq[MessageId]] =
var
messages: seq[Message]
#ids: seq[MessageId]
#messages: seq[Message]
Comment on lines +370 to +371
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✂️ ?

Suggested change
#ids: seq[MessageId]
#messages: seq[Message]
#ids: seq[MessageId]
#messages: seq[Message]

invalidRequests = 0
if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer, score = peer.score
Expand All @@ -329,14 +383,15 @@ proc handleIWant*(
invalidRequests.inc()
if invalidRequests > 20:
libp2p_gossipsub_received_iwants.inc(1, labelValues = ["skipped"])
return messages
return result
continue
let msg = g.mcache.get(mid).valueOr:
libp2p_gossipsub_received_iwants.inc(1, labelValues = ["unknown"])
continue
libp2p_gossipsub_received_iwants.inc(1, labelValues = ["correct"])
messages.add(msg)
return messages
result.messages.add(msg)
result.ids.add(mid)
Comment on lines +386 to +393
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to define a messages and ids variables, instead of using result. See https://status-im.github.io/nim-style-guide/language.result.html

return result

proc commitMetrics(metrics: var MeshMetrics) =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
Expand Down
2 changes: 2 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ type

heartbeatEvents*: seq[AsyncEvent]

iwantsRequested*: HashSet[MessageId]

MeshMetrics* = object # scratch buffers for metrics
otherPeersPerTopicMesh*: int64
otherPeersPerTopicFanout*: int64
Expand Down
4 changes: 4 additions & 0 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type
## IDONTWANT contains unvalidated message id:s which may be long and/or
## expensive to look up, so we apply the same salting to them as during
## unvalidated message processing
heIsSendings*:Deque[HashSet[MessageId]]
heIsReceivings*:Deque[HashSet[MessageId]]
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
Expand Down Expand Up @@ -557,4 +559,6 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.iDontWants.addFirst(default(HashSet[SaltedId]))
result.heIsSendings.addFirst(default(HashSet[MessageId]) )
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
result.startSendNonPriorityTask()
Loading
Loading