@@ -3424,22 +3424,36 @@ func (n *raft) updateLeader(newLeader string) {
34243424// during recovery or from processAppendEntries when there are new entries
34253425// to be committed.
34263426func (n * raft ) processAppendEntry (ae * appendEntry , sub * subscription ) {
3427+ // Make a copy of the reply subject, as ae may return
3428+ // to its pool as part of processAppendEntryLocked
3429+ subject := ae .reply
3430+
34273431 n .Lock ()
3432+ ar := n .processAppendEntryLocked (ae , sub )
3433+ n .Unlock ()
3434+
3435+ if ar != nil {
3436+ var scratch [appendEntryResponseLen ]byte
3437+ n .sendRPC (subject , ar .reply , ar .encode (scratch [:]))
3438+ arPool .Put (ar )
3439+ }
3440+ }
3441+
3442+ // Process the given appendEntry. Optionally returns a appendEntryResponse.
3443+ // The caller is responsible for sending out the response and return it to
3444+ // its pool.
3445+ // Lock should be held.
3446+ func (n * raft ) processAppendEntryLocked (ae * appendEntry , sub * subscription ) * appendEntryResponse {
34283447 // Don't reset here if we have been asked to assume leader position.
34293448 if ! n .lxfer {
34303449 n .resetElectionTimeout ()
34313450 }
34323451
34333452 // Just return if closed or we had previous write error.
34343453 if n .State () == Closed || n .werr != nil {
3435- n .Unlock ()
3436- return
3454+ return nil
34373455 }
34383456
3439- // Scratch buffer for responses.
3440- var scratch [appendEntryResponseLen ]byte
3441- arbuf := scratch [:]
3442-
34433457 // Grab term from append entry. But if leader explicitly defined its term, use that instead.
34443458 // This is required during catchup if the leader catches us up on older items from previous terms.
34453459 // While still allowing us to confirm they're matching our highest known term.
@@ -3463,12 +3477,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
34633477 n .stepdownLocked (ae .leader )
34643478 } else {
34653479 // Let them know we are the leader.
3466- ar := newAppendEntryResponse (n .term , n .pindex , n .id , false )
34673480 n .debug ("AppendEntry ignoring old term from another leader" )
3468- n .sendRPC (ae .reply , _EMPTY_ , ar .encode (arbuf ))
3469- arPool .Put (ar )
3470- n .Unlock ()
3471- return
3481+ return newAppendEntryResponse (n .term , n .pindex , n .id , false )
34723482 }
34733483 }
34743484
@@ -3510,9 +3520,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
35103520 // that doesn't send the leader term when catching up. We can reject old catchups from newer subs
35113521 // later, just by checking the append entry is on the correct term.
35123522 if ! isNew && sub != nil && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
3513- n .Unlock ()
35143523 n .debug ("AppendEntry ignoring old entry from previous catchup" )
3515- return
3524+ return nil
35163525 }
35173526
35183527 // If this term is greater than ours.
@@ -3535,10 +3544,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
35353544 } else {
35363545 n .debug ("AppendEntry ignoring old entry from previous catchup" )
35373546 }
3538- n .Unlock ()
35393547 // No need to respond, the leader will respond with the highest term already.
35403548 // We can simply reject here without sending additional responses.
3541- return
3549+ return nil
35423550 }
35433551
35443552 // Check state if we are catching up.
@@ -3549,21 +3557,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
35493557 // Reset our notion of catching up.
35503558 catchingUp = false
35513559 } else if isNew {
3552- var ar * appendEntryResponse
3553- var inbox string
35543560 // Check to see if we are stalled. If so recreate our catchup state and resend response.
35553561 if n .catchupStalled () {
35563562 n .debug ("Catchup may be stalled, will request again" )
3557- inbox = n .createCatchup (ae )
3558- ar = newAppendEntryResponse (n .pterm , n .pindex , n .id , false )
3559- }
3560- n .Unlock ()
3561- if ar != nil {
3562- n .sendRPC (ae .reply , inbox , ar .encode (arbuf ))
3563- arPool .Put (ar )
3563+ inbox := n .createCatchup (ae )
3564+ ar := newAppendEntryResponse (n .pterm , n .pindex , n .id , false )
3565+ ar .reply = inbox
3566+ return ar
35643567 }
35653568 // Ignore new while catching up or replaying.
3566- return
3569+ return nil
35673570 }
35683571 }
35693572
@@ -3633,8 +3636,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
36333636 // For example, if we got partial catchup, and then the "real-time" messages came in very delayed.
36343637 // If we reported "success" on those "real-time" messages, we'd wrongfully be providing
36353638 // quorum while not having an up-to-date log.
3636- n .Unlock ()
3637- return
3639+ return nil
36383640 }
36393641
36403642 // Check if we are catching up. If we are here we know the leader did not have all of the entries
@@ -3646,8 +3648,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
36463648 if len (ae .entries ) != 2 || ae .entries [0 ].Type != EntrySnapshot || ae .entries [1 ].Type != EntryPeerState {
36473649 n .warn ("Expected first catchup entry to be a snapshot and peerstate, will retry" )
36483650 n .cancelCatchup ()
3649- n .Unlock ()
3650- return
3651+ return nil
36513652 }
36523653
36533654 if ps , err := decodePeerState (ae .entries [1 ].Data ); err == nil {
@@ -3657,8 +3658,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
36573658 } else {
36583659 n .warn ("Could not parse snapshot peerstate correctly" )
36593660 n .cancelCatchup ()
3660- n .Unlock ()
3661- return
3661+ return nil
36623662 }
36633663
36643664 // Inherit state from appendEntry with the leader's snapshot.
@@ -3675,25 +3675,21 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
36753675 // Install the leader's snapshot as our own.
36763676 if err := n .installSnapshot (snap ); err != nil {
36773677 n .setWriteErrLocked (err )
3678- n .Unlock ()
3679- return
3678+ return nil
36803679 }
36813680 n .resetInitializing ()
36823681
36833682 // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
36843683 n .apply .push (newCommittedEntry (n .commit , ae .entries [:1 ]))
3685- n .Unlock ()
3686- return
3684+ return nil
36873685 }
36883686
36893687 // Setup our state for catching up.
36903688 n .debug ("AppendEntry did not match [%d:%d] with [%d:%d]" , ae .pterm , ae .pindex , n .pterm , n .pindex )
36913689 inbox := n .createCatchup (ae )
36923690 ar := newAppendEntryResponse (n .pterm , n .pindex , n .id , false )
3693- n .Unlock ()
3694- n .sendRPC (ae .reply , inbox , ar .encode (arbuf ))
3695- arPool .Put (ar )
3696- return
3691+ ar .reply = inbox
3692+ return ar
36973693 }
36983694
36993695CONTINUE:
@@ -3705,8 +3701,7 @@ CONTINUE:
37053701 if err != ErrStoreClosed {
37063702 n .warn ("Error storing entry to WAL: %v" , err )
37073703 }
3708- n .Unlock ()
3709- return
3704+ return nil
37103705 }
37113706 // Save in memory for faster processing during applyCommit.
37123707 // Only save so many however to avoid memory bloat.
@@ -3768,9 +3763,9 @@ CONTINUE:
37683763 }
37693764 }
37703765
3771- // Make a copy of these values, as the AppendEntry might be cached and returned to the pool in applyCommit.
3766+ // Copy ae.commit as the AppendEntry might be cached and returned to the
3767+ // pool in applyCommit.
37723768 aeCommit := ae .commit
3773- aeReply := ae .reply
37743769
37753770 // Apply anything we need here.
37763771 if aeCommit > n .commit {
@@ -3789,17 +3784,12 @@ CONTINUE:
37893784 // Only ever respond to new entries.
37903785 // Never respond to catchup messages, because providing quorum based on this is unsafe.
37913786 // The only way for the leader to receive "success" MUST be through this path.
3792- var ar * appendEntryResponse
37933787 if sub != nil && isNew {
3794- ar = newAppendEntryResponse (n .pterm , n .pindex , n .id , true )
3788+ // Success. Send our response.
3789+ return newAppendEntryResponse (n .pterm , n .pindex , n .id , true )
37953790 }
3796- n .Unlock ()
37973791
3798- // Success. Send our response.
3799- if ar != nil {
3800- n .sendRPC (aeReply , _EMPTY_ , ar .encode (arbuf ))
3801- arPool .Put (ar )
3802- }
3792+ return nil
38033793}
38043794
38053795// resetInitializing resets the notion of initializing.
0 commit comments