Skip to content

Commit 186ea50

Browse files
committed
Raft batching: Avoid sending leftovers right away
This commit removes a "pathological" case from the current Raft batching mechanism: if the proposal queue contains more entries than one batch can fit, then raft will send a full batch, followed by a small batch containing the leftovers. However, it was observed that its quite possible that while the first batch was being stored and sent, clients may already have pushed more stuff into the proposal queue in the meantime. With this fix the server will compose and send a full batch, then the leftovers are handled as follows: if more proposals were pushed into the proposal queue, then we carry over the leftovers to the next iteration. So that the leftovers are batched together with the proposals that were added pushed in the meantime. If there are no more proposals, then we send the leftovers right away. For performance testing only at point.
1 parent 494350d commit 186ea50

File tree

1 file changed

+74
-31
lines changed

1 file changed

+74
-31
lines changed

server/raft.go

Lines changed: 74 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2635,6 +2635,8 @@ func (n *raft) runAsLeader() {
26352635
n.sendPeerState()
26362636
n.Unlock()
26372637

2638+
var propBatch []*proposedEntry
2639+
26382640
hb := time.NewTicker(hbInterval)
26392641
defer hb.Stop()
26402642

@@ -2654,41 +2656,43 @@ func (n *raft) runAsLeader() {
26542656
}
26552657
n.resp.recycle(&ars)
26562658
case <-n.prop.ch:
2657-
const maxBatch = 256 * 1024
2658-
const maxEntries = 512
2659-
var entries []*Entry
2660-
2661-
es, sz := n.prop.pop(), 0
2662-
for _, b := range es {
2663-
if b.Type == EntryRemovePeer {
2664-
n.doRemovePeerAsLeader(string(b.Data))
2659+
// Drain the channel and combine with any leftovers.
2660+
newProposals := n.prop.pop()
2661+
propBatch = append(propBatch, newProposals...)
2662+
2663+
// Loop until all proposals are batched and sent.
2664+
for len(propBatch) > 0 {
2665+
batchEntries, newLeftovers, sz := n.composeBatch(propBatch)
2666+
2667+
// Send our batch if we have one.
2668+
if len(batchEntries) > 0 {
2669+
log.Println("Batch:", len(batchEntries), "entries", sz, "bytes")
2670+
n.sendAppendEntry(batchEntries)
26652671
}
2666-
entries = append(entries, b.Entry)
2667-
// Increment size.
2668-
sz += len(b.Data) + 1
2669-
// If below thresholds go ahead and send.
2670-
if sz < maxBatch && len(entries) < maxEntries {
2671-
continue
2672+
2673+
// Only handle replies for proposals that were consumed.
2674+
numConsumed := len(propBatch) - len(newLeftovers)
2675+
consumedProposals := propBatch[:numConsumed]
2676+
for _, pe := range consumedProposals {
2677+
if pe.reply != _EMPTY_ {
2678+
n.sendReply(pe.reply, nil)
2679+
}
2680+
pe.returnToPool()
26722681
}
2673-
log.Println("Batch:", len(entries), "entries", sz, "bytes")
2674-
n.sendAppendEntry(entries)
2675-
// Reset our sz and entries.
2676-
// We need to re-create `entries` because there is a reference
2677-
// to it in the node's pae map.
2678-
sz, entries = 0, nil
2679-
}
2680-
if len(entries) > 0 {
2681-
log.Println("Batch:", len(entries), "entries", sz, "bytes")
2682-
n.sendAppendEntry(entries)
2683-
}
2684-
// Respond to any proposals waiting for a confirmation.
2685-
for _, pe := range es {
2686-
if pe.reply != _EMPTY_ {
2687-
n.sendReply(pe.reply, nil)
2682+
2683+
// The new leftovers become the batch for the next iteration.
2684+
propBatch = newLeftovers
2685+
2686+
// If we have leftovers and the proposal channel is empty,
2687+
// loop again to send them immediately. Otherwise, break
2688+
// to allow the select to pull more from the channel.
2689+
if len(propBatch) > 0 && n.prop.len() == 0 {
2690+
continue
26882691
}
2689-
pe.returnToPool()
2692+
break
26902693
}
2691-
n.prop.recycle(&es)
2694+
// Recycle the container for the new proposals that were popped.
2695+
n.prop.recycle(&newProposals)
26922696

26932697
case <-hb.C:
26942698
if n.notActive() {
@@ -2721,6 +2725,45 @@ func (n *raft) runAsLeader() {
27212725
}
27222726
}
27232727

2728+
// composeBatch will compose a batch from a set of proposals.
2729+
// It will return a batch of entries to be sent and any new leftovers.
2730+
func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*proposedEntry, int) {
2731+
const maxBatch = 256 * 1024
2732+
const maxEntries = 512
2733+
2734+
if len(allProposals) == 0 {
2735+
return nil, nil, 0
2736+
}
2737+
2738+
var sz int
2739+
end := 0
2740+
for end < len(allProposals) {
2741+
p := allProposals[end]
2742+
sz += len(p.Data) + 1
2743+
end++
2744+
if sz < maxBatch && end < maxEntries {
2745+
continue
2746+
}
2747+
break
2748+
}
2749+
2750+
// The batch to send is from the start up to `end`.
2751+
batchProposals := allProposals[:end]
2752+
// The new leftovers are from `end` to the end.
2753+
newLeftovers := allProposals[end:]
2754+
2755+
// Create the entries to be sent.
2756+
entries := make([]*Entry, len(batchProposals))
2757+
for i, p := range batchProposals {
2758+
if p.Type == EntryRemovePeer {
2759+
n.doRemovePeerAsLeader(string(p.Data))
2760+
}
2761+
entries[i] = p.Entry
2762+
}
2763+
2764+
return entries, newLeftovers, sz
2765+
}
2766+
27242767
// Quorum reports the quorum status. Will be called on former leaders.
27252768
func (n *raft) Quorum() bool {
27262769
n.RLock()

0 commit comments

Comments
 (0)