-
Notifications
You must be signed in to change notification settings - Fork 66
Large message handling combined #1234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: p2p-research
Are you sure you want to change the base?
Changes from all commits
35d1876
8377eb0
f1b78f6
8bb6215
4b691b6
b2a75fc
64ef502
1fe2bae
ab0feb3
0677ab4
43f1f0a
58470a7
0acd8be
e779185
ac9e1e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -24,6 +24,7 @@ import | |||
| ./rpc/[messages, message, protobuf], | ||||
| ../protocol, | ||||
| ../../stream/connection, | ||||
| ../../utils/semaphore, | ||||
| ../../peerinfo, | ||||
| ../../peerid, | ||||
| ../../utility, | ||||
|
|
@@ -36,6 +37,32 @@ import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat | |||
|
|
||||
| export types, scoring, behavior, pubsub | ||||
|
|
||||
| import std/atomics | ||||
| const WARMUP_THRESHOLD = 2 | ||||
| var | ||||
| lma_dup_during_validation: Atomic[uint32] # number of duplicates during 1st message validation | ||||
| lma_idontwant_saves: Atomic[uint32] # number of Txs saved due to idontwant | ||||
| lma_duplicate_count: Atomic[uint32] # number of duplicate messages received | ||||
| lma_iwants_sent: Atomic[uint32] # number of iwant requests sent | ||||
| lma_iwants_replied: Atomic[uint32] # number of iwant messages that are replied | ||||
| lma_imreceiving_saves: Atomic[uint32] # number of messages saved due to imreceiving message | ||||
| lma_unique_receives: Atomic[uint32] # number of unique messages received | ||||
| lma_mesh_recvs_aftar_iwant: Atomic[uint32] # messages received from mesh, after sending iwant request | ||||
| lma_warmup_messages: Atomic[uint32] # dont issue idontwant during if < WARMUP_THRESHOLD | ||||
|
|
||||
| lma_dup_during_validation.store(0) | ||||
| lma_idontwant_saves.store(0) | ||||
| lma_duplicate_count.store(0) | ||||
| lma_iwants_sent.store(0) | ||||
| lma_iwants_replied.store(0) | ||||
| lma_imreceiving_saves.store(0) | ||||
| lma_unique_receives.store(0) | ||||
| lma_mesh_recvs_aftar_iwant.store(0) | ||||
| lma_warmup_messages.store(0) | ||||
|
|
||||
| export lma_dup_during_validation, lma_idontwant_saves, lma_duplicate_count, lma_iwants_sent, | ||||
| lma_iwants_replied, lma_imreceiving_saves, lma_unique_receives, lma_mesh_recvs_aftar_iwant | ||||
|
|
||||
| logScope: | ||||
| topics = "libp2p gossipsub" | ||||
|
|
||||
|
|
@@ -226,6 +253,7 @@ method init*(g: GossipSub) = | |||
| g.codecs &= GossipSubCodec_12 | ||||
| g.codecs &= GossipSubCodec_11 | ||||
| g.codecs &= GossipSubCodec_10 | ||||
| g.iwantsRequested = initHashSet[MessageId]() | ||||
|
|
||||
| method onNewPeer*(g: GossipSub, peer: PubSubPeer) = | ||||
| g.withPeerStats(peer.peerId) do(stats: var PeerStats): | ||||
|
|
@@ -347,11 +375,13 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = | |||
|
|
||||
| var respControl: ControlMessage | ||||
| g.handleIDontWant(peer, control.idontwant) | ||||
| g.handlePreamble(peer, control.preamble) | ||||
| g.handleIMReceiving(peer, control.imreceiving) | ||||
| let iwant = g.handleIHave(peer, control.ihave) | ||||
| if iwant.messageIDs.len > 0: | ||||
| respControl.iwant.add(iwant) | ||||
| respControl.prune.add(g.handleGraft(peer, control.graft)) | ||||
| let messages = g.handleIWant(peer, control.iwant) | ||||
| let (messages, msgIDs) = g.handleIWant(peer, control.iwant) | ||||
|
|
||||
| let | ||||
| isPruneNotEmpty = respControl.prune.len > 0 | ||||
|
|
@@ -360,6 +390,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = | |||
| if isPruneNotEmpty or isIWantNotEmpty: | ||||
| if isIWantNotEmpty: | ||||
| libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) | ||||
| lma_iwants_sent.atomicInc(respControl.iwant.len.uint32) | ||||
|
|
||||
| if isPruneNotEmpty: | ||||
| for prune in respControl.prune: | ||||
|
|
@@ -376,11 +407,17 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = | |||
| let topic = smsg.topic | ||||
| if g.knownTopics.contains(topic): | ||||
| libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) | ||||
| #We send preamble first, so the peers sends IMReceiving to mesh members | ||||
| g.broadcast(@[peer], RPCMsg(control: some(ControlMessage( | ||||
| preamble: @[ControlIHave(topicID: topic, messageIDs: msgIDs)] | ||||
| ))), isHighPriority = true) | ||||
| else: | ||||
| libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) | ||||
|
|
||||
|
|
||||
| # iwant replies have lower priority | ||||
| trace "sending iwant reply messages", peer | ||||
| lma_iwants_replied.atomicInc(messages.len.uint32) | ||||
| g.send(peer, RPCMsg(messages: messages), isHighPriority = false) | ||||
|
|
||||
| proc validateAndRelay( | ||||
|
|
@@ -397,6 +434,8 @@ proc validateAndRelay( | |||
| toSendPeers.incl(peers[]) | ||||
| g.subscribedDirectPeers.withValue(topic, peers): | ||||
| toSendPeers.incl(peers[]) | ||||
| if not (peer in toSendPeers): | ||||
| lma_mesh_recvs_aftar_iwant.atomicInc() | ||||
| toSendPeers.excl(peer) | ||||
|
|
||||
| if msg.data.len > max(512, msgId.len * 10): | ||||
|
|
@@ -409,25 +448,41 @@ proc validateAndRelay( | |||
| # descored) and that the savings from honest peers are greater than the | ||||
| # cost a dishonest peer can incur in short time (since the IDONTWANT is | ||||
| # small). | ||||
| var peersToSendIDontWant = HashSet[PubSubPeer]() | ||||
| addToSendPeers(peersToSendIDontWant) | ||||
| peersToSendIDontWant.exclIfIt( | ||||
| it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11 | ||||
| ) | ||||
| g.broadcast( | ||||
| peersToSendIDontWant, | ||||
| RPCMsg( | ||||
| control: | ||||
| some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) | ||||
| ), | ||||
| isHighPriority = true, | ||||
| ) | ||||
|
|
||||
| #We dont consider first WARMUP_THRESHOLD messages in stats (They are for raising Cwnd) | ||||
| if lma_warmup_messages.load() < WARMUP_THRESHOLD: | ||||
| lma_warmup_messages.atomicInc() | ||||
| if lma_warmup_messages.load() == WARMUP_THRESHOLD: | ||||
| lma_dup_during_validation.store(0) | ||||
| lma_idontwant_saves.store(0) | ||||
| lma_duplicate_count.store(0) | ||||
| lma_iwants_sent.store(0) | ||||
| lma_iwants_replied.store(0) | ||||
| lma_imreceiving_saves.store(0) | ||||
| lma_unique_receives.store(0) | ||||
| lma_mesh_recvs_aftar_iwant.store(0) | ||||
|
|
||||
| else: | ||||
| var peersToSendIDontWant = HashSet[PubSubPeer]() | ||||
| addToSendPeers(peersToSendIDontWant) | ||||
| peersToSendIDontWant.exclIfIt( | ||||
| it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11 | ||||
| ) | ||||
| g.broadcast( | ||||
| peersToSendIDontWant, | ||||
| RPCMsg( | ||||
| control: | ||||
| some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) | ||||
| ), | ||||
| isHighPriority = true, | ||||
| ) | ||||
|
|
||||
| let validation = await g.validate(msg) | ||||
|
|
||||
| var seenPeers: HashSet[PubSubPeer] | ||||
| discard g.validationSeen.pop(saltedId, seenPeers) | ||||
| libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64) | ||||
| lma_dup_during_validation.atomicInc(seenPeers.len.uint32) | ||||
| libp2p_gossipsub_saved_bytes.inc( | ||||
| (msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"] | ||||
| ) | ||||
|
|
@@ -463,23 +518,57 @@ proc validateAndRelay( | |||
| # Don't send it to peers that sent it during validation | ||||
| toSendPeers.excl(seenPeers) | ||||
|
|
||||
| #We have received IMReceiving from these peers, We should not exclude them | ||||
| #Ideally we should wait (TxTime + large safety cushion) before sending to these peers | ||||
| var receivingPeers: HashSet[PubSubPeer] | ||||
| for pr in toSendPeers: | ||||
| for heIsReceiving in pr.heIsReceivings: | ||||
| if msgId in heIsReceiving: | ||||
| receivingPeers.incl(pr) | ||||
| break | ||||
| toSendPeers.excl(receivingPeers) | ||||
| lma_imreceiving_saves.atomicInc(receivingPeers.len.uint32) | ||||
|
|
||||
| proc isMsgInIdontWant(it: PubSubPeer): bool = | ||||
| for iDontWant in it.iDontWants: | ||||
| if saltedId in iDontWant: | ||||
| libp2p_gossipsub_idontwant_saved_messages.inc | ||||
| libp2p_gossipsub_saved_bytes.inc( | ||||
| msg.data.len.int64, labelValues = ["idontwant"] | ||||
| ) | ||||
| lma_idontwant_saves.atomicInc() | ||||
| return true | ||||
| return false | ||||
|
|
||||
| toSendPeers.exclIfIt(isMsgInIdontWant(it)) | ||||
|
|
||||
| # In theory, if topics are the same in all messages, we could batch - we'd | ||||
| # also have to be careful to only include validated messages | ||||
| g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) | ||||
| #g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) | ||||
| trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer | ||||
|
|
||||
| let sem = newAsyncSemaphore(1) | ||||
| var staggerPeers = toSeq(toSendPeers) | ||||
| g.rng.shuffle(staggerPeers) | ||||
|
|
||||
| proc sendToOne(p: PubSubPeer) {.async.} = | ||||
| g.broadcast(@[p], RPCMsg(control: some(ControlMessage( | ||||
| preamble: @[ControlIHave(topicID: topic, messageIDs: @[msgId])] | ||||
| ))), isHighPriority = true) | ||||
|
|
||||
| #Won't add much delay as we populate messages in outgoing message queues (no timeouts needed) | ||||
| #Small delay (nearing avg link latency) is sufficient for IMReceiving messages | ||||
| await sem.acquire() | ||||
| defer: sem.release() | ||||
|
|
||||
| if isMsgInIdontWant(p): | ||||
| return | ||||
| g.broadcast(@[p], RPCMsg(messages: @[msg]), isHighPriority = false) | ||||
|
|
||||
| for p in staggerPeers: | ||||
| asyncSpawn sendToOne(p) | ||||
|
|
||||
|
|
||||
| if g.knownTopics.contains(topic): | ||||
| libp2p_pubsub_messages_rebroadcasted.inc( | ||||
| toSendPeers.len.int64, labelValues = [topic] | ||||
|
|
@@ -596,11 +685,13 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} = | |||
| g.rewardDelivered(peer, topic, false, delay) | ||||
|
|
||||
| libp2p_gossipsub_duplicate.inc() | ||||
| lma_duplicate_count.atomicInc() | ||||
|
|
||||
| # onto the next message | ||||
| continue | ||||
|
|
||||
| libp2p_gossipsub_received.inc() | ||||
| lma_unique_receives.atomicInc() | ||||
|
|
||||
| # avoid processing messages we are not interested in | ||||
| if topic notin g.topics: | ||||
|
|
@@ -782,7 +873,25 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy | |||
|
|
||||
| g.mcache.put(msgId, msg) | ||||
|
|
||||
| g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) | ||||
| #g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✂️ ?
Suggested change
|
||||
| let sem = newAsyncSemaphore(1) | ||||
| var staggerPeers = toSeq(peers) | ||||
| g.rng.shuffle(staggerPeers) | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are the peers randomly sorted? |
||||
|
|
||||
| #We send message immediately after sending preamble to each peer | ||||
| proc sendToOne(p: PubSubPeer) {.async.} = | ||||
| g.broadcast(@[p], RPCMsg(control: some(ControlMessage( | ||||
| preamble: @[ControlIHave(topicID: topic, messageIDs: @[msgId])] | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sending the message length is still pending, right? |
||||
| ))), isHighPriority = true) | ||||
|
|
||||
| await sem.acquire() | ||||
| defer: sem.release() | ||||
|
|
||||
| g.broadcast(@[p], RPCMsg(messages: @[msg]), isHighPriority = false) | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading in the specs, I see the following:
The way it is coded right now, it will send the preamble always, right? Is this because this PR is for experiments? |
||||
|
|
||||
| for p in staggerPeers: | ||||
| asyncSpawn sendToOne(p) | ||||
|
|
||||
|
Comment on lines
+881
to
+894
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the AsyncSemaphore(1) means that the preambles are being sent sequentially to each peer, right? Why is this the case? Shouldn't we try to get the preamble messages being sent concurrently to all peers? |
||||
|
|
||||
| if g.knownTopics.contains(topic): | ||||
| libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -290,14 +290,31 @@ proc handleIHave*( | |||||||||
| for ihave in ihaves: | ||||||||||
| trace "peer sent ihave", peer, topicID = ihave.topicID, msgs = ihave.messageIDs | ||||||||||
| if ihave.topicID in g.topics: | ||||||||||
| #look here for receieved idontwants for the same message | ||||||||||
| var meshPeers: HashSet[PubSubPeer] | ||||||||||
| g.mesh.withValue(ihave.topicID, peers): meshPeers.incl(peers[]) | ||||||||||
| g.subscribedDirectPeers.withValue(ihave.topicID, peers): meshPeers.incl(peers[]) | ||||||||||
|
|
||||||||||
| for msgId in ihave.messageIDs: | ||||||||||
| if not g.hasSeen(g.salt(msgId)): | ||||||||||
| if peer.iHaveBudget <= 0: | ||||||||||
| break | ||||||||||
| elif msgId in g.iwantsRequested: | ||||||||||
| break | ||||||||||
| elif msgId notin res.messageIDs: | ||||||||||
| res.messageIDs.add(msgId) | ||||||||||
| dec peer.iHaveBudget | ||||||||||
| trace "requested message via ihave", messageID = msgId | ||||||||||
| #dont send IWANT if we have received (N number of) IDontWant(s) for a msgID | ||||||||||
| let saltedID = g.salt(msgId) | ||||||||||
| var numFinds: int = 0 | ||||||||||
| for meshPeer in meshPeers: | ||||||||||
| for heDontWant in meshPeer.iDontWants: | ||||||||||
| if saltedID in heDontWant: | ||||||||||
| numFinds = numFinds + 1 | ||||||||||
| #break; | ||||||||||
| if numFinds == 0: #We currently wait for 1 IDontWants | ||||||||||
| res.messageIDs.add(msgId) | ||||||||||
| dec peer.iHaveBudget | ||||||||||
| g.iwantsRequested.incl(msgId) | ||||||||||
| trace "requested message via ihave", messageID = msgId | ||||||||||
| # shuffling res.messageIDs before sending it out to increase the likelihood | ||||||||||
| # of getting an answer if the peer truncates the list due to internal size restrictions. | ||||||||||
| g.rng.shuffle(res.messageIDs) | ||||||||||
|
|
@@ -309,12 +326,49 @@ proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWa | |||||||||
| if peer.iDontWants[^1].len > 1000: | ||||||||||
| break | ||||||||||
| peer.iDontWants[^1].incl(g.salt(messageId)) | ||||||||||
|
|
||||||||||
| proc handlePreamble*(g: GossipSub, peer: PubSubPeer, preambles: seq[ControlIHave]) = | ||||||||||
| for preamble in preambles: | ||||||||||
| for messageId in preamble.messageIDs: | ||||||||||
| #Idealy a peer should a maximum of peer_preamble_announcements preambles for unfinished downloads | ||||||||||
| #A peer violating this should be pnalized through P4??? | ||||||||||
| if peer.heIsSendings[^1].len > 1000: | ||||||||||
| break | ||||||||||
| peer.heIsSendings[^1].incl(messageId) | ||||||||||
| #Experimental change for quick performance evaluation only (Ideally for very large messages): | ||||||||||
| #[ | ||||||||||
| 1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message | ||||||||||
| 2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant | ||||||||||
|
|
||||||||||
| 3) Better solution is to send Message detail in a message preamble, That can be used for IMReceiving | ||||||||||
| ]# | ||||||||||
| var toSendPeers = HashSet[PubSubPeer]() | ||||||||||
| g.floodsub.withValue(preamble.topicID, peers): toSendPeers.incl(peers[]) | ||||||||||
| g.mesh.withValue(preamble.topicID, peers): toSendPeers.incl(peers[]) | ||||||||||
|
|
||||||||||
| # add direct peers | ||||||||||
| toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(preamble.topicID)) | ||||||||||
|
|
||||||||||
| g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( | ||||||||||
| imreceiving: @[ControlIWant(messageIDs: @[messageId])] | ||||||||||
| ))), isHighPriority = true) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| proc handleIMReceiving*(g: GossipSub, | ||||||||||
| peer: PubSubPeer, | ||||||||||
| imreceivings: seq[ControlIWant]) = | ||||||||||
| for imreceiving in imreceivings: | ||||||||||
| for messageId in imreceiving.messageIDs: | ||||||||||
| if peer.heIsReceivings[^1].len > 1000: break | ||||||||||
| if messageId.len > 100: continue | ||||||||||
|
Comment on lines
+362
to
+363
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When do messageIds exceed 100? I'm wondering if this condition should be added in other places as i haven't seen length checking for messageIds using |
||||||||||
| peer.heIsReceivings[^1].incl(messageId) | ||||||||||
|
|
||||||||||
| proc handleIWant*( | ||||||||||
| g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant] | ||||||||||
| ): seq[Message] = | ||||||||||
| ): tuple[messages: seq[Message], ids: seq[MessageId]] = | ||||||||||
| var | ||||||||||
| messages: seq[Message] | ||||||||||
| #ids: seq[MessageId] | ||||||||||
| #messages: seq[Message] | ||||||||||
|
Comment on lines
+370
to
+371
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✂️ ?
Suggested change
|
||||||||||
| invalidRequests = 0 | ||||||||||
| if peer.score < g.parameters.gossipThreshold: | ||||||||||
| trace "iwant: ignoring low score peer", peer, score = peer.score | ||||||||||
|
|
@@ -329,14 +383,15 @@ proc handleIWant*( | |||||||||
| invalidRequests.inc() | ||||||||||
| if invalidRequests > 20: | ||||||||||
| libp2p_gossipsub_received_iwants.inc(1, labelValues = ["skipped"]) | ||||||||||
| return messages | ||||||||||
| return result | ||||||||||
| continue | ||||||||||
| let msg = g.mcache.get(mid).valueOr: | ||||||||||
| libp2p_gossipsub_received_iwants.inc(1, labelValues = ["unknown"]) | ||||||||||
| continue | ||||||||||
| libp2p_gossipsub_received_iwants.inc(1, labelValues = ["correct"]) | ||||||||||
| messages.add(msg) | ||||||||||
| return messages | ||||||||||
| result.messages.add(msg) | ||||||||||
| result.ids.add(mid) | ||||||||||
|
Comment on lines
+386
to
+393
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably better to define a |
||||||||||
| return result | ||||||||||
|
|
||||||||||
| proc commitMetrics(metrics: var MeshMetrics) = | ||||||||||
| libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this change result in a GossipSubCodec_14 as described in libp2p/specs#654 ?