@@ -3483,8 +3483,8 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
34833483 n .resetElectionTimeout ()
34843484 }
34853485
3486- // Just return if closed or we had previous write error.
3487- if n .State () == Closed || n .werr != nil {
3486+ // Just return if closed or we had previous write error, or invalid sub
3487+ if n .State () == Closed || n .werr != nil || sub == nil {
34883488 return nil
34893489 }
34903490
@@ -3539,7 +3539,7 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35393539 catchingUp := n .catchup != nil
35403540 // Is this a new entry? New entries will be delivered on the append entry
35413541 // sub, rather than a catch-up sub.
3542- isNew := sub != nil && sub == n .aesub
3542+ isNew := sub == n .aesub
35433543
35443544 // Track leader directly
35453545 if isNew && ae .leader != noLeader {
@@ -3553,7 +3553,7 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35533553 // If we are/were catching up ignore old catchup subs, but only if catching up from an older server
35543554 // that doesn't send the leader term when catching up. We can reject old catchups from newer subs
35553555 // later, just by checking the append entry is on the correct term.
3556- if ! isNew && sub != nil && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
3556+ if ! isNew && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
35573557 n .debug ("AppendEntry ignoring old entry from previous catchup" )
35583558 return nil
35593559 }
@@ -3569,9 +3569,8 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35693569 n .debug ("Term higher than ours and we are not a follower: %v, stepping down to %q" , n .State (), ae .leader )
35703570 n .stepdownLocked (ae .leader )
35713571 }
3572- } else if lterm < n .term && sub != nil && (isNew || ae .lterm != 0 ) {
3572+ } else if lterm < n .term && (isNew || ae .lterm != 0 ) {
35733573 // Anything that's below our expected highest term needs to be rejected.
3574- // Unless we're replaying (sub=nil), in which case we'll always continue.
35753574 // For backward-compatibility we shouldn't reject if we're being caught up by an old server.
35763575 if isNew {
35773576 n .debug ("Rejected AppendEntry from a leader (%s) with term %d which is less than ours" , ae .leader , lterm )
@@ -3729,21 +3728,14 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
37293728CONTINUE:
37303729 // Save to our WAL if we have entries.
37313730 if ae .shouldStore () {
3732- // Only store if an original which will have sub != nil
3733- if sub != nil {
3734- if err := n .storeToWAL (ae ); err != nil {
3735- if err != ErrStoreClosed {
3736- n .warn ("Error storing entry to WAL: %v" , err )
3737- }
3738- return nil
3731+ if err := n .storeToWAL (ae ); err != nil {
3732+ if err != ErrStoreClosed {
3733+ n .warn ("Error storing entry to WAL: %v" , err )
37393734 }
3740- n .cachePendingEntry (ae )
3741- n .resetInitializing ()
3742- } else {
3743- // This is a replay on startup so just take the appendEntry version.
3744- n .pterm = ae .term
3745- n .pindex = ae .pindex + 1
3735+ return nil
37463736 }
3737+ n .cachePendingEntry (ae )
3738+ n .resetInitializing ()
37473739 }
37483740
37493741 // ae should no longer be used after this call as
@@ -3753,7 +3745,7 @@ CONTINUE:
37533745 // Only ever respond to new entries.
37543746 // Never respond to catchup messages, because providing quorum based on this is unsafe.
37553747 // The only way for the leader to receive "success" MUST be through this path.
3756- if sub != nil && isNew {
3748+ if isNew {
37573749 // Success. Send our response.
37583750 return newAppendEntryResponse (n .pterm , n .pindex , n .id , true )
37593751 }
0 commit comments