@@ -91,6 +91,7 @@ type RaftNode interface {
9191type WAL interface {
9292 Type () StorageType
9393 StoreMsg (subj string , hdr , msg []byte , ttl int64 ) (uint64 , int64 , error )
94+ MsgSize (msg []byte ) uint64
9495 LoadMsg (index uint64 , sm * StoreMsg ) (* StoreMsg , error )
9596 RemoveMsg (index uint64 ) (bool , error )
9697 Compact (index uint64 ) (uint64 , error )
@@ -867,19 +868,22 @@ func (s *Server) transferRaftLeaders() bool {
867868// Propose will propose a new entry to the group.
868869// This should only be called on the leader.
869870func (n * raft ) Propose (data []byte ) error {
870- n .Lock ()
871- defer n .Unlock ()
872- // Check state under lock, we might not be leader anymore.
873- if state := n .State (); state != Leader {
871+ n .RLock ()
872+ state := n .State ()
873+ writeError := n .werr
874+ prop := n .prop
875+ n .RUnlock ()
876+
877+ if state != Leader {
874878 n .debug ("Proposal ignored, not leader (state: %v)" , state )
875879 return errNotLeader
876880 }
877881
878- // Error if we had a previous write error.
879- if werr := n .werr ; werr != nil {
880- return werr
882+ if writeError != nil {
883+ return writeError
881884 }
882- n .prop .push (newProposedEntry (newEntry (EntryNormal , data ), _EMPTY_ ))
885+
886+ prop .push (newProposedEntry (newEntry (EntryNormal , data ), _EMPTY_ ))
883887 return nil
884888}
885889
@@ -1014,10 +1018,9 @@ func (n *raft) AdjustBootClusterSize(csz int) error {
10141018// Must be the leader.
10151019func (n * raft ) AdjustClusterSize (csz int ) error {
10161020 n .Lock ()
1017- defer n .Unlock ()
1018-
10191021 // Check state under lock, we might not be leader anymore.
10201022 if n .State () != Leader {
1023+ n .Unlock ()
10211024 return errNotLeader
10221025 }
10231026 // Same floor as bootstrap.
@@ -1029,8 +1032,10 @@ func (n *raft) AdjustClusterSize(csz int) error {
10291032 // a quorum.
10301033 n .csz = csz
10311034 n .qn = n .csz / 2 + 1
1035+ n .Unlock ()
10321036
1033- n .sendPeerState ()
1037+ n .prop .push (newProposedEntry (
1038+ newEntry (EntryPeerState , encodePeerState (n .currentPeerState ())), _EMPTY_ ))
10341039 return nil
10351040}
10361041
@@ -1213,10 +1218,8 @@ func (n *raft) encodeSnapshot(snap *snapshot) []byte {
12131218// Should only be used when the upper layers know this is most recent.
12141219// Used when restoring streams, moving a stream from R1 to R>1, etc.
12151220func (n * raft ) SendSnapshot (data []byte ) error {
1216- n .Lock ()
1217- defer n .Unlock ()
1218- // Don't check if we're leader before sending and storing, this is used on scaleup.
1219- n .sendAppendEntryLocked ([]* Entry {{EntrySnapshot , data }}, false )
1221+ // TODO Need to copy data?
1222+ n .prop .push (newProposedEntry (newEntry (EntrySnapshot , data ), _EMPTY_ ))
12201223 return nil
12211224}
12221225
@@ -1714,6 +1717,8 @@ func (n *raft) StepDown(preferred ...string) error {
17141717 // Send the append entry directly rather than via the proposals queue,
17151718 // as we will switch to follower state immediately and will blow away
17161719 // the contents of the proposal queue in the process.
1720+ // Also, we won't store the entry in the Raft log, so it is OK ot call
1721+ // into sendAppendEntry() directly from here.
17171722 if maybeLeader != noLeader {
17181723 n .debug ("Selected %q for new leader, stepping down due to leadership transfer" , maybeLeader )
17191724 ae := newEntry (EntryLeaderTransfer , []byte (maybeLeader ))
@@ -1829,13 +1834,16 @@ func (n *raft) Peers() []*Peer {
18291834// Update and propose our known set of peers.
18301835func (n * raft ) ProposeKnownPeers (knownPeers []string ) {
18311836 n .Lock ()
1832- defer n .Unlock ()
18331837 // If we are the leader update and send this update out.
18341838 if n .State () != Leader {
1839+ n .Unlock ()
18351840 return
18361841 }
18371842 n .updateKnownPeersLocked (knownPeers )
1838- n .sendPeerState ()
1843+ n .Unlock ()
1844+
1845+ n .prop .push (newProposedEntry (
1846+ newEntry (EntryPeerState , encodePeerState (n .currentPeerState ())), _EMPTY_ ))
18391847}
18401848
18411849// Update our known set of peers.
@@ -2630,12 +2638,11 @@ func (n *raft) runAsLeader() {
26302638 n .unsubscribe (rpsub )
26312639 n .Unlock ()
26322640 }()
2633-
2634- // To send out our initial peer state.
2635- n .sendPeerState ()
26362641 n .Unlock ()
26372642
26382643 var propBatch []* proposedEntry
2644+ n .sendAppendEntry (
2645+ []* Entry {{EntryPeerState , encodePeerState (n .currentPeerState ())}})
26392646
26402647 hb := time .NewTicker (hbInterval )
26412648 defer hb .Stop ()
@@ -2739,6 +2746,14 @@ func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*propose
27392746 end := 0
27402747 for end < len (allProposals ) {
27412748 p := allProposals [end ]
2749+ // If we have a snapshot do not batch with anything else.
2750+ if p .Type == EntrySnapshot {
2751+ if end == 0 {
2752+ sz = len (p .Data ) + 1
2753+ end = 1
2754+ }
2755+ break
2756+ }
27422757 sz += len (p .Data ) + 1
27432758 end ++
27442759 if sz < maxBatch && end < maxEntries {
@@ -3812,13 +3827,24 @@ CONTINUE:
38123827 if ae .shouldStore () {
38133828 // Only store if an original which will have sub != nil
38143829 if sub != nil {
3815- if err := n .storeToWAL (ae ); err != nil {
3830+ n .Unlock ()
3831+ size , seq , err := n .storeToWAL (ae )
3832+ n .Lock ()
3833+ if err != nil {
38163834 if err != ErrStoreClosed {
38173835 n .warn ("Error storing entry to WAL: %v" , err )
38183836 }
3837+ if err == errEntryStoreFailed {
3838+ n .resetWAL ()
3839+ n .cancelCatchup ()
3840+ }
38193841 n .Unlock ()
38203842 return
38213843 }
3844+ n .bytes += size
3845+ n .pterm = ae .term
3846+ n .pindex = seq
3847+ n .active = time .Now ()
38223848 n .cachePendingEntry (ae )
38233849 n .resetInitializing ()
38243850 } else {
@@ -3979,50 +4005,54 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
39794005 return newAppendEntry (n .id , n .term , n .commit , n .pterm , n .pindex , entries )
39804006}
39814007
3982- // Determine if we should store an entry. This stops us from storing
3983- // heartbeat messages.
4008+ // Determine if we should store an entry.
4009+ // This stops us from storing heartbeat and leader transfer messages.
39844010func (ae * appendEntry ) shouldStore () bool {
3985- return ae != nil && len (ae .entries ) > 0
4011+ if ae == nil {
4012+ return false
4013+ }
4014+ l := len (ae .entries )
4015+ if l == 0 {
4016+ return false
4017+ }
4018+ if l == 1 {
4019+ return ae .entries [0 ].Type != EntryLeaderTransfer
4020+ }
4021+ return true
4022+ }
4023+
4024+ func (ae * appendEntry ) shouldCheckLeader () bool {
4025+ if ae != nil && len (ae .entries ) == 1 &&
4026+ ae .entries [0 ].Type == EntrySnapshot {
4027+ return true
4028+ }
4029+ return false
39864030}
39874031
39884032// Store our append entry to our WAL.
3989- // lock should be held.
3990- func (n * raft ) storeToWAL (ae * appendEntry ) error {
4033+ // Returns the number of bytes written and the sequence number
4034+ // assigned to the message.
4035+ func (n * raft ) storeToWAL (ae * appendEntry ) (uint64 , uint64 , error ) {
39914036 if ae == nil {
3992- return fmt .Errorf ("raft: Missing append entry for storage" )
4037+ return 0 , 0 , fmt .Errorf ("raft: Missing append entry for storage" )
39934038 }
4039+
39944040 if n .werr != nil {
3995- return n .werr
4041+ return 0 , 0 , n .werr
39964042 }
39974043
39984044 seq , _ , err := n .wal .StoreMsg (_EMPTY_ , nil , ae .buf , 0 )
39994045 if err != nil {
4000- n .setWriteErrLocked (err )
4001- return err
4046+ return 0 , 0 , err
40024047 }
4003-
40044048 // Sanity checking for now.
40054049 if index := ae .pindex + 1 ; index != seq {
40064050 n .warn ("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset" , ae , seq , n .pindex )
4007- if n .State () == Leader {
4008- n .stepdownLocked (n .selectNextLeader ())
4009- }
4010- // Reset and cancel any catchup.
4011- n .resetWAL ()
4012- n .cancelCatchup ()
4013- return errEntryStoreFailed
4051+ return 0 , 0 , errEntryStoreFailed
40144052 }
40154053
4016- var sz uint64
4017- if n .wtype == FileStorage {
4018- sz = fileStoreMsgSize (_EMPTY_ , nil , ae .buf )
4019- } else {
4020- sz = memStoreMsgSize (_EMPTY_ , nil , ae .buf )
4021- }
4022- n .bytes += sz
4023- n .pterm = ae .term
4024- n .pindex = seq
4025- return nil
4054+ sz := n .wal .MsgSize (ae .buf )
4055+ return sz , seq , nil
40264056}
40274057
40284058const (
@@ -4031,19 +4061,25 @@ const (
40314061 paeWarnModulo = 5_000
40324062)
40334063
4064+ // sendAppendEntry builds a appendEntry and stores it to the WAL,
4065+ // before sending it to the followers.
4066+ // It is expected for this method to be called from Raft's main
4067+ // goroutine, unless the appendEntry does not need to be stored
4068+ // to the WAL (heartbeat or EntryLeaderTransfer)
40344069func (n * raft ) sendAppendEntry (entries []* Entry ) {
4035- n .Lock ()
4036- defer n .Unlock ()
4037- n .sendAppendEntryLocked (entries , true )
4038- }
4039- func (n * raft ) sendAppendEntryLocked (entries []* Entry , checkLeader bool ) {
4040- // Safeguard against sending an append entry right after a stepdown from a different goroutine.
4041- // Specifically done while holding the lock to not race.
4042- if checkLeader && n .State () != Leader {
4070+ // Safeguard against sending an append entry right after a stepdown
4071+ // from a different goroutine. Specifically done while holding the
4072+ // lock to not race.
4073+ n .RLock ()
4074+ state := n .State ()
4075+ ae := n .buildAppendEntry (entries )
4076+ n .RUnlock ()
4077+
4078+ if ae .shouldCheckLeader () && state != Leader {
40434079 n .debug ("Not sending append entry, not leader" )
4080+ ae .returnToPool ()
40444081 return
40454082 }
4046- ae := n .buildAppendEntry (entries )
40474083
40484084 var err error
40494085 var scratch [1024 ]byte
@@ -4055,12 +4091,30 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) {
40554091 // If we have entries store this in our wal.
40564092 shouldStore := ae .shouldStore ()
40574093 if shouldStore {
4058- if err := n .storeToWAL (ae ); err != nil {
4094+ size , seq , err := n .storeToWAL (ae )
4095+ n .Lock ()
4096+ if err != nil {
4097+ n .setWriteErrLocked (err )
4098+ if err == errEntryStoreFailed {
4099+ if n .State () == Leader {
4100+ n .stepdownLocked (n .selectNextLeader ())
4101+ }
4102+ // are we sure we want this?
4103+ n .resetWAL ()
4104+ n .cancelCatchup ()
4105+ }
4106+ n .Unlock ()
40594107 return
40604108 }
4109+
4110+ n .bytes += size
4111+ n .pterm = ae .term
4112+ n .pindex = seq
40614113 n .active = time .Now ()
40624114 n .cachePendingEntry (ae )
4115+ n .Unlock ()
40634116 }
4117+
40644118 n .sendRPC (n .asubj , n .areply , ae .buf )
40654119 if ! shouldStore {
40664120 ae .returnToPool ()
@@ -4149,23 +4203,15 @@ func (n *raft) peerNames() []string {
41494203
41504204func (n * raft ) currentPeerState () * peerState {
41514205 n .RLock ()
4152- ps := n .currentPeerStateLocked ()
4153- n .RUnlock ()
4154- return ps
4155- }
4156-
4157- func (n * raft ) currentPeerStateLocked () * peerState {
4206+ defer n .RUnlock ()
41584207 return & peerState {n .peerNames (), n .csz , n .extSt }
41594208}
41604209
4161- // sendPeerState will send our current peer state to the cluster.
4162- // Lock should be held.
4163- func (n * raft ) sendPeerState () {
4164- n .sendAppendEntryLocked ([]* Entry {{EntryPeerState , encodePeerState (n .currentPeerStateLocked ())}}, true )
4165- }
4166-
41674210// Send a heartbeat.
41684211func (n * raft ) sendHeartbeat () {
4212+ // OK to call sendAppendEntry() directly here.
4213+ // No need to push heardbeats into prop queue
4214+ // because we don't store those into the log.
41694215 n .sendAppendEntry (nil )
41704216}
41714217
0 commit comments