@@ -514,7 +514,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
514514 truncateAndErr (index - 1 )
515515 break
516516 }
517- n .processAppendEntry (ae , nil )
517+ n .recoverAppendEntry (ae )
518518 // Check how much we have queued up so far to determine if we should pause.
519519 for _ , e := range ae .entries {
520520 qsz += len (e .Data )
@@ -3444,26 +3444,50 @@ func (n *raft) updateLeader(newLeader string) {
34443444 }
34453445}
34463446
3447- // processAppendEntry will process an appendEntry. This is called either
3448- // during recovery or from processAppendEntries when there are new entries
3449- // to be committed.
3447+ // recoverAppendEntry will process an appendEntry for recovery
3448+ func (n * raft ) recoverAppendEntry (ae * appendEntry ) {
3449+ n .Lock ()
3450+ defer n .Unlock ()
3451+
3452+ n .pterm = ae .term
3453+ n .pindex = ae .pindex + 1
3454+
3455+ n .processEntriesAndCommit (ae , false )
3456+ }
3457+
3458+ // processAppendEntry will process an appendEntry. This is called from
3459+ // processAppendEntries when there are new entries to be committed.
34503460func (n * raft ) processAppendEntry (ae * appendEntry , sub * subscription ) {
3461+ // Make a copy of the reply subject, as ae may return
3462+ // to its pool as part of processAppendEntryLocked
3463+ subject := ae .reply
3464+
34513465 n .Lock ()
3466+ ar := n .processAppendEntryLocked (ae , sub )
3467+ n .Unlock ()
3468+
3469+ if ar != nil {
3470+ var scratch [appendEntryResponseLen ]byte
3471+ n .sendRPC (subject , ar .reply , ar .encode (scratch [:]))
3472+ arPool .Put (ar )
3473+ }
3474+ }
3475+
3476+ // Process the given appendEntry. Optionally returns a appendEntryResponse.
3477+ // The caller is responsible for sending out the response and return it to
3478+ // its pool.
3479+ // Lock should be held.
3480+ func (n * raft ) processAppendEntryLocked (ae * appendEntry , sub * subscription ) * appendEntryResponse {
34523481 // Don't reset here if we have been asked to assume leader position.
34533482 if ! n .lxfer {
34543483 n .resetElectionTimeout ()
34553484 }
34563485
3457- // Just return if closed or we had previous write error.
3458- if n .State () == Closed || n .werr != nil {
3459- n .Unlock ()
3460- return
3486+ // Just return if closed or we had previous write error, or invalid sub
3487+ if n .State () == Closed || n .werr != nil || sub == nil {
3488+ return nil
34613489 }
34623490
3463- // Scratch buffer for responses.
3464- var scratch [appendEntryResponseLen ]byte
3465- arbuf := scratch [:]
3466-
34673491 // Grab term from append entry. But if leader explicitly defined its term, use that instead.
34683492 // This is required during catchup if the leader catches us up on older items from previous terms.
34693493 // While still allowing us to confirm they're matching our highest known term.
@@ -3496,12 +3520,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
34963520 n .stepdownLocked (ae .leader )
34973521 } else {
34983522 // Let them know we are the leader.
3499- ar := newAppendEntryResponse (n .term , n .pindex , n .id , false )
35003523 n .debug ("AppendEntry ignoring old term from another leader" )
3501- n .sendRPC (ae .reply , _EMPTY_ , ar .encode (arbuf ))
3502- arPool .Put (ar )
3503- n .Unlock ()
3504- return
3524+ return newAppendEntryResponse (n .term , n .pindex , n .id , false )
35053525 }
35063526 }
35073527
@@ -3528,7 +3548,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
35283548 catchingUp := n .catchup != nil
35293549 // Is this a new entry? New entries will be delivered on the append entry
35303550 // sub, rather than a catch-up sub.
3531- isNew := sub != nil && sub == n .aesub
3551+ isNew := sub == n .aesub
35323552
35333553 // Track leader directly
35343554 if isNew && ae .leader != noLeader {
@@ -3542,10 +3562,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
35423562 // If we are/were catching up ignore old catchup subs, but only if catching up from an older server
35433563 // that doesn't send the leader term when catching up. We can reject old catchups from newer subs
35443564 // later, just by checking the append entry is on the correct term.
3545- if ! isNew && sub != nil && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
3546- n .Unlock ()
3565+ if ! isNew && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
35473566 n .debug ("AppendEntry ignoring old entry from previous catchup" )
3548- return
3567+ return nil
35493568 }
35503569
35513570 // If this term is greater than ours.
@@ -3559,21 +3578,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
35593578 n .debug ("Term higher than ours and we are not a follower: %v, stepping down to %q" , n .State (), ae .leader )
35603579 n .stepdownLocked (ae .leader )
35613580 }
3562- } else if lterm < n .term && sub != nil && (isNew || ae .lterm != 0 ) {
3581+ } else if lterm < n .term && (isNew || ae .lterm != 0 ) {
35633582 // Anything that's below our expected highest term needs to be rejected.
3564- // Unless we're replaying (sub=nil), in which case we'll always continue.
35653583 // For backward-compatibility we shouldn't reject if we're being caught up by an old server.
35663584 if ! isNew {
35673585 n .debug ("AppendEntry ignoring old entry from previous catchup" )
3568- n .Unlock ()
3569- return
3586+ return nil
35703587 }
35713588 n .debug ("Rejected AppendEntry from a leader (%s) with term %d which is less than ours" , ae .leader , lterm )
3572- ar := newAppendEntryResponse (n .term , n .pindex , n .id , false )
3573- n .Unlock ()
3574- n .sendRPC (ae .reply , _EMPTY_ , ar .encode (arbuf ))
3575- arPool .Put (ar )
3576- return
3589+ return newAppendEntryResponse (n .term , n .pindex , n .id , false )
35773590 }
35783591
35793592 // Check state if we are catching up.
@@ -3584,21 +3597,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
35843597 // Reset our notion of catching up.
35853598 catchingUp = false
35863599 } else if isNew {
3587- var ar * appendEntryResponse
3588- var inbox string
35893600 // Check to see if we are stalled. If so recreate our catchup state and resend response.
35903601 if n .catchupStalled () {
35913602 n .debug ("Catchup may be stalled, will request again" )
3592- inbox = n .createCatchup (ae )
3593- ar = newAppendEntryResponse (n .pterm , n .pindex , n .id , false )
3594- }
3595- n .Unlock ()
3596- if ar != nil {
3597- n .sendRPC (ae .reply , inbox , ar .encode (arbuf ))
3598- arPool .Put (ar )
3603+ inbox := n .createCatchup (ae )
3604+ ar := newAppendEntryResponse (n .pterm , n .pindex , n .id , false )
3605+ ar .reply = inbox
3606+ return ar
35993607 }
36003608 // Ignore new while catching up or replaying.
3601- return
3609+ return nil
36023610 }
36033611 }
36043612
@@ -3668,8 +3676,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
36683676 // For example, if we got partial catchup, and then the "real-time" messages came in very delayed.
36693677 // If we reported "success" on those "real-time" messages, we'd wrongfully be providing
36703678 // quorum while not having an up-to-date log.
3671- n .Unlock ()
3672- return
3679+ return nil
36733680 }
36743681
36753682 // Check if we are catching up. If we are here we know the leader did not have all of the entries
@@ -3681,8 +3688,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
36813688 if len (ae .entries ) != 2 || ae .entries [0 ].Type != EntrySnapshot || ae .entries [1 ].Type != EntryPeerState {
36823689 n .warn ("Expected first catchup entry to be a snapshot and peerstate, will retry" )
36833690 n .cancelCatchup ()
3684- n .Unlock ()
3685- return
3691+ return nil
36863692 }
36873693
36883694 if ps , err := decodePeerState (ae .entries [1 ].Data ); err == nil {
@@ -3692,8 +3698,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
36923698 } else {
36933699 n .warn ("Could not parse snapshot peerstate correctly" )
36943700 n .cancelCatchup ()
3695- n .Unlock ()
3696- return
3701+ return nil
36973702 }
36983703
36993704 // Inherit state from appendEntry with the leader's snapshot.
@@ -3710,48 +3715,54 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
37103715 // Install the leader's snapshot as our own.
37113716 if err := n .installSnapshot (snap ); err != nil {
37123717 n .setWriteErrLocked (err )
3713- n .Unlock ()
3714- return
3718+ return nil
37153719 }
37163720 n .resetInitializing ()
37173721
37183722 // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
37193723 n .apply .push (newCommittedEntry (n .commit , ae .entries [:1 ]))
3720- n .Unlock ()
3721- return
3724+ return nil
37223725 }
37233726
37243727 // Setup our state for catching up.
37253728 n .debug ("AppendEntry did not match [%d:%d] with [%d:%d]" , ae .pterm , ae .pindex , n .pterm , n .pindex )
37263729 inbox := n .createCatchup (ae )
37273730 ar := newAppendEntryResponse (n .pterm , n .pindex , n .id , false )
3728- n .Unlock ()
3729- n .sendRPC (ae .reply , inbox , ar .encode (arbuf ))
3730- arPool .Put (ar )
3731- return
3731+ ar .reply = inbox
3732+ return ar
37323733 }
37333734
37343735CONTINUE:
37353736 // Save to our WAL if we have entries.
37363737 if ae .shouldStore () {
3737- // Only store if an original which will have sub != nil
3738- if sub != nil {
3739- if err := n .storeToWAL (ae ); err != nil {
3740- if err != ErrStoreClosed {
3741- n .warn ("Error storing entry to WAL: %v" , err )
3742- }
3743- n .Unlock ()
3744- return
3738+ if err := n .storeToWAL (ae ); err != nil {
3739+ if err != ErrStoreClosed {
3740+ n .warn ("Error storing entry to WAL: %v" , err )
37453741 }
3746- n .cachePendingEntry (ae )
3747- n .resetInitializing ()
3748- } else {
3749- // This is a replay on startup so just take the appendEntry version.
3750- n .pterm = ae .term
3751- n .pindex = ae .pindex + 1
3742+ return nil
37523743 }
3744+ n .cachePendingEntry (ae )
3745+ n .resetInitializing ()
37533746 }
37543747
3748+ // ae should no longer be used after this call as
3749+ // processEntriesAndCommit may return the appendEntry back to its pool
3750+ n .processEntriesAndCommit (ae , isNew )
3751+
3752+ // Only ever respond to new entries.
3753+ // Never respond to catchup messages, because providing quorum based on this is unsafe.
3754+ // The only way for the leader to receive "success" MUST be through this path.
3755+ if isNew {
3756+ // Success. Send our response.
3757+ return newAppendEntryResponse (n .pterm , n .pindex , n .id , true )
3758+ }
3759+
3760+ return nil
3761+ }
3762+
3763+ // Process all entries in appendEntry and try to commit.
3764+ // Lock should be held.
3765+ func (n * raft ) processEntriesAndCommit (ae * appendEntry , isNew bool ) {
37553766 // Check to see if we have any related entries to process here.
37563767 for _ , e := range ae .entries {
37573768 switch e .Type {
@@ -3789,9 +3800,9 @@ CONTINUE:
37893800 }
37903801 }
37913802
3792- // Make a copy of these values, as the AppendEntry might be cached and returned to the pool in applyCommit.
3803+ // Copy ae.commit as the AppendEntry might be cached and returned to the
3804+ // pool in applyCommit.
37933805 aeCommit := ae .commit
3794- aeReply := ae .reply
37953806
37963807 // Apply anything we need here.
37973808 if aeCommit > n .commit {
@@ -3806,21 +3817,6 @@ CONTINUE:
38063817 }
38073818 }
38083819 }
3809-
3810- // Only ever respond to new entries.
3811- // Never respond to catchup messages, because providing quorum based on this is unsafe.
3812- // The only way for the leader to receive "success" MUST be through this path.
3813- var ar * appendEntryResponse
3814- if sub != nil && isNew {
3815- ar = newAppendEntryResponse (n .pterm , n .pindex , n .id , true )
3816- }
3817- n .Unlock ()
3818-
3819- // Success. Send our response.
3820- if ar != nil {
3821- n .sendRPC (aeReply , _EMPTY_ , ar .encode (arbuf ))
3822- arPool .Put (ar )
3823- }
38243820}
38253821
38263822// resetInitializing resets the notion of initializing.
0 commit comments