diff --git a/server/filestore.go b/server/filestore.go index 572e4e33dd7..4e7dc557c65 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -8143,6 +8143,10 @@ func fileStoreMsgSizeEstimate(slen, maxPayload int) uint64 { return uint64(emptyRecordLen + slen + 4 + maxPayload) } +func (fs *fileStore) MsgSize(msg []byte) uint64 { + return fileStoreMsgSizeRaw(0, 0, len(msg)) +} + // ResetState resets any state that's temporary. For example when changing leaders. func (fs *fileStore) ResetState() { fs.mu.Lock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index c75b697cee3..99fb0df1f09 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6973,7 +6973,7 @@ func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) { n := mset.node.(*raft) n.Lock() ae := n.buildAppendEntry(nil) - err = n.storeToWAL(ae) + _, _, err = n.storeToWAL(ae) n.Unlock() index, commit, applied := n.Progress() require_NoError(t, err) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 516ae2459f5..bc7520a1894 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4269,7 +4269,7 @@ func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { rn.Lock() entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}} ae := encode(t, rn.buildAppendEntry(entries)) - err = rn.storeToWAL(ae) + _, _, err = rn.storeToWAL(ae) minPindex := rn.pindex rn.Unlock() require_NoError(t, err) diff --git a/server/memstore.go b/server/memstore.go index f75146c6ab3..5cc9379b795 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -2033,6 +2033,10 @@ func memStoreMsgSize(subj string, hdr, msg []byte) uint64 { return memStoreMsgSizeRaw(len(subj), len(hdr), len(msg)) } +func (ms *memStore) MsgSize(msg []byte) uint64 { + return memStoreMsgSizeRaw(0, 0, len(msg)) +} + // ResetState resets any state that's temporary. For example when changing leaders. func (ms *memStore) ResetState() { ms.mu.Lock() diff --git a/server/raft.go b/server/raft.go index 69f09f9c3ad..61cbd13c58e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "hash" + "log" "math" "math/rand" "net" @@ -90,6 +91,7 @@ type RaftNode interface { type WAL interface { Type() StorageType StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, int64, error) + MsgSize(msg []byte) uint64 LoadMsg(index uint64, sm *StoreMsg) (*StoreMsg, error) RemoveMsg(index uint64) (bool, error) Compact(index uint64) (uint64, error) @@ -866,19 +868,22 @@ func (s *Server) transferRaftLeaders() bool { // Propose will propose a new entry to the group. // This should only be called on the leader. func (n *raft) Propose(data []byte) error { - n.Lock() - defer n.Unlock() - // Check state under lock, we might not be leader anymore. - if state := n.State(); state != Leader { + n.RLock() + state := n.State() + writeError := n.werr + prop := n.prop + n.RUnlock() + + if state != Leader { n.debug("Proposal ignored, not leader (state: %v)", state) return errNotLeader } - // Error if we had a previous write error. - if werr := n.werr; werr != nil { - return werr + if writeError != nil { + return writeError } - n.prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_)) + + prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_)) return nil } @@ -1013,10 +1018,9 @@ func (n *raft) AdjustBootClusterSize(csz int) error { // Must be the leader. func (n *raft) AdjustClusterSize(csz int) error { n.Lock() - defer n.Unlock() - // Check state under lock, we might not be leader anymore. if n.State() != Leader { + n.Unlock() return errNotLeader } // Same floor as bootstrap. @@ -1028,8 +1032,10 @@ func (n *raft) AdjustClusterSize(csz int) error { // a quorum. n.csz = csz n.qn = n.csz/2 + 1 + n.Unlock() - n.sendPeerState() + n.prop.push(newProposedEntry( + newEntry(EntryPeerState, encodePeerState(n.currentPeerState())), _EMPTY_)) return nil } @@ -1212,10 +1218,8 @@ func (n *raft) encodeSnapshot(snap *snapshot) []byte { // Should only be used when the upper layers know this is most recent. // Used when restoring streams, moving a stream from R1 to R>1, etc. func (n *raft) SendSnapshot(data []byte) error { - n.Lock() - defer n.Unlock() - // Don't check if we're leader before sending and storing, this is used on scaleup. - n.sendAppendEntryLocked([]*Entry{{EntrySnapshot, data}}, false) + // TODO Need to copy data? + n.prop.push(newProposedEntry(newEntry(EntrySnapshot, data), _EMPTY_)) return nil } @@ -1713,6 +1717,8 @@ func (n *raft) StepDown(preferred ...string) error { // Send the append entry directly rather than via the proposals queue, // as we will switch to follower state immediately and will blow away // the contents of the proposal queue in the process. + // Also, we won't store the entry in the Raft log, so it is OK ot call + // into sendAppendEntry() directly from here. if maybeLeader != noLeader { n.debug("Selected %q for new leader, stepping down due to leadership transfer", maybeLeader) ae := newEntry(EntryLeaderTransfer, []byte(maybeLeader)) @@ -1828,13 +1834,16 @@ func (n *raft) Peers() []*Peer { // Update and propose our known set of peers. func (n *raft) ProposeKnownPeers(knownPeers []string) { n.Lock() - defer n.Unlock() // If we are the leader update and send this update out. if n.State() != Leader { + n.Unlock() return } n.updateKnownPeersLocked(knownPeers) - n.sendPeerState() + n.Unlock() + + n.prop.push(newProposedEntry( + newEntry(EntryPeerState, encodePeerState(n.currentPeerState())), _EMPTY_)) } // Update our known set of peers. @@ -2629,11 +2638,12 @@ func (n *raft) runAsLeader() { n.unsubscribe(rpsub) n.Unlock() }() - - // To send out our initial peer state. - n.sendPeerState() n.Unlock() + var propBatch []*proposedEntry + n.sendAppendEntry( + []*Entry{{EntryPeerState, encodePeerState(n.currentPeerState())}}) + hb := time.NewTicker(hbInterval) defer hb.Stop() @@ -2653,39 +2663,43 @@ func (n *raft) runAsLeader() { } n.resp.recycle(&ars) case <-n.prop.ch: - const maxBatch = 256 * 1024 - const maxEntries = 512 - var entries []*Entry - - es, sz := n.prop.pop(), 0 - for _, b := range es { - if b.Type == EntryRemovePeer { - n.doRemovePeerAsLeader(string(b.Data)) + // Drain the channel and combine with any leftovers. + newProposals := n.prop.pop() + propBatch = append(propBatch, newProposals...) + + // Loop until all proposals are batched and sent. + for len(propBatch) > 0 { + batchEntries, newLeftovers, sz := n.composeBatch(propBatch) + + // Send our batch if we have one. + if len(batchEntries) > 0 { + log.Println("Batch:", len(batchEntries), "entries", sz, "bytes", "maxBatch", n.maxBatchSize()) + n.sendAppendEntry(batchEntries) } - entries = append(entries, b.Entry) - // Increment size. - sz += len(b.Data) + 1 - // If below thresholds go ahead and send. - if sz < maxBatch && len(entries) < maxEntries { - continue + + // Only handle replies for proposals that were consumed. + numConsumed := len(propBatch) - len(newLeftovers) + consumedProposals := propBatch[:numConsumed] + for _, pe := range consumedProposals { + if pe.reply != _EMPTY_ { + n.sendReply(pe.reply, nil) + } + pe.returnToPool() } - n.sendAppendEntry(entries) - // Reset our sz and entries. - // We need to re-create `entries` because there is a reference - // to it in the node's pae map. - sz, entries = 0, nil - } - if len(entries) > 0 { - n.sendAppendEntry(entries) - } - // Respond to any proposals waiting for a confirmation. - for _, pe := range es { - if pe.reply != _EMPTY_ { - n.sendReply(pe.reply, nil) + + // The new leftovers become the batch for the next iteration. + propBatch = newLeftovers + + // If we have leftovers and the proposal channel is empty, + // loop again to send them immediately. Otherwise, break + // to allow the select to pull more from the channel. + if len(propBatch) > 0 && n.prop.len() == 0 { + continue } - pe.returnToPool() + break } - n.prop.recycle(&es) + // Recycle the container for the new proposals that were popped. + n.prop.recycle(&newProposals) case <-hb.C: if n.notActive() { @@ -2718,6 +2732,70 @@ func (n *raft) runAsLeader() { } } +// Returns the maximum number of bytes we can safely +// send in a single message. +func (n *raft) maxBatchSize() int { + max_payload := MAX_PAYLOAD_SIZE + if n.s.info.MaxPayload > 0 { + max_payload = int(n.s.info.MaxPayload) + } + if n.acc.mpay > 0 { + max_payload = int(n.acc.mpay) + } + return max_payload - MAX_CONTROL_LINE_SIZE +} + +// composeBatch will compose a batch from a set of proposals. +// It will return a batch of entries to be sent and any new leftovers. +func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*proposedEntry, int) { + const maxEntries = math.MaxUint16 + maxBatchSize := n.maxBatchSize() + + if len(allProposals) == 0 { + return nil, nil, 0 + } + + end := 0 + batchSize := int(appendEntryBaseLen) + + for end < len(allProposals) { + p := allProposals[end] + msgSize := len(p.Data) + 1 + 4 // to encode type and size + + // If we have a snapshot do not batch with anything else. + if p.Type == EntrySnapshot { + if end == 0 { + batchSize += msgSize + end = 1 + } + break + } + + if end == 0 || (batchSize+msgSize) < maxBatchSize && end < maxEntries { + batchSize += msgSize + end++ + continue + } + break + } + + // The batch to send is from the start up to `end`. + batchProposals := allProposals[:end] + // The new leftovers are from `end` to the end. + newLeftovers := allProposals[end:] + + // Create the entries to be sent. + entries := make([]*Entry, len(batchProposals)) + for i, p := range batchProposals { + if p.Type == EntryRemovePeer { + n.doRemovePeerAsLeader(string(p.Data)) + } + entries[i] = p.Entry + } + + return entries, newLeftovers, batchSize +} + // Quorum reports the quorum status. Will be called on former leaders. func (n *raft) Quorum() bool { n.RLock() @@ -3766,13 +3844,24 @@ CONTINUE: if ae.shouldStore() { // Only store if an original which will have sub != nil if sub != nil { - if err := n.storeToWAL(ae); err != nil { + n.Unlock() + size, seq, err := n.storeToWAL(ae) + n.Lock() + if err != nil { if err != ErrStoreClosed { n.warn("Error storing entry to WAL: %v", err) } + if err == errEntryStoreFailed { + n.resetWAL() + n.cancelCatchup() + } n.Unlock() return } + n.bytes += size + n.pterm = ae.term + n.pindex = seq + n.active = time.Now() n.cachePendingEntry(ae) n.resetInitializing() } else { @@ -3933,50 +4022,54 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { return newAppendEntry(n.id, n.term, n.commit, n.pterm, n.pindex, entries) } -// Determine if we should store an entry. This stops us from storing -// heartbeat messages. +// Determine if we should store an entry. +// This stops us from storing heartbeat and leader transfer messages. func (ae *appendEntry) shouldStore() bool { - return ae != nil && len(ae.entries) > 0 + if ae == nil { + return false + } + l := len(ae.entries) + if l == 0 { + return false + } + if l == 1 { + return ae.entries[0].Type != EntryLeaderTransfer + } + return true +} + +func (ae *appendEntry) shouldCheckLeader() bool { + if ae != nil && len(ae.entries) == 1 && + ae.entries[0].Type == EntrySnapshot { + return true + } + return false } // Store our append entry to our WAL. -// lock should be held. -func (n *raft) storeToWAL(ae *appendEntry) error { +// Returns the number of bytes written and the sequence number +// assigned to the message. +func (n *raft) storeToWAL(ae *appendEntry) (uint64, uint64, error) { if ae == nil { - return fmt.Errorf("raft: Missing append entry for storage") + return 0, 0, fmt.Errorf("raft: Missing append entry for storage") } + if n.werr != nil { - return n.werr + return 0, 0, n.werr } seq, _, err := n.wal.StoreMsg(_EMPTY_, nil, ae.buf, 0) if err != nil { - n.setWriteErrLocked(err) - return err + return 0, 0, err } - // Sanity checking for now. if index := ae.pindex + 1; index != seq { n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex) - if n.State() == Leader { - n.stepdownLocked(n.selectNextLeader()) - } - // Reset and cancel any catchup. - n.resetWAL() - n.cancelCatchup() - return errEntryStoreFailed + return 0, 0, errEntryStoreFailed } - var sz uint64 - if n.wtype == FileStorage { - sz = fileStoreMsgSize(_EMPTY_, nil, ae.buf) - } else { - sz = memStoreMsgSize(_EMPTY_, nil, ae.buf) - } - n.bytes += sz - n.pterm = ae.term - n.pindex = seq - return nil + sz := n.wal.MsgSize(ae.buf) + return sz, seq, nil } const ( @@ -3985,19 +4078,25 @@ const ( paeWarnModulo = 5_000 ) +// sendAppendEntry builds a appendEntry and stores it to the WAL, +// before sending it to the followers. +// It is expected for this method to be called from Raft's main +// goroutine, unless the appendEntry does not need to be stored +// to the WAL (heartbeat or EntryLeaderTransfer) func (n *raft) sendAppendEntry(entries []*Entry) { - n.Lock() - defer n.Unlock() - n.sendAppendEntryLocked(entries, true) -} -func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) { - // Safeguard against sending an append entry right after a stepdown from a different goroutine. - // Specifically done while holding the lock to not race. - if checkLeader && n.State() != Leader { + // Safeguard against sending an append entry right after a stepdown + // from a different goroutine. Specifically done while holding the + // lock to not race. + n.RLock() + state := n.State() + ae := n.buildAppendEntry(entries) + n.RUnlock() + + if ae.shouldCheckLeader() && state != Leader { n.debug("Not sending append entry, not leader") + ae.returnToPool() return } - ae := n.buildAppendEntry(entries) var err error var scratch [1024]byte @@ -4009,12 +4108,30 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) { // If we have entries store this in our wal. shouldStore := ae.shouldStore() if shouldStore { - if err := n.storeToWAL(ae); err != nil { + size, seq, err := n.storeToWAL(ae) + n.Lock() + if err != nil { + n.setWriteErrLocked(err) + if err == errEntryStoreFailed { + if n.State() == Leader { + n.stepdownLocked(n.selectNextLeader()) + } + // are we sure we want this? + n.resetWAL() + n.cancelCatchup() + } + n.Unlock() return } + + n.bytes += size + n.pterm = ae.term + n.pindex = seq n.active = time.Now() n.cachePendingEntry(ae) + n.Unlock() } + n.sendRPC(n.asubj, n.areply, ae.buf) if !shouldStore { ae.returnToPool() @@ -4103,23 +4220,15 @@ func (n *raft) peerNames() []string { func (n *raft) currentPeerState() *peerState { n.RLock() - ps := n.currentPeerStateLocked() - n.RUnlock() - return ps -} - -func (n *raft) currentPeerStateLocked() *peerState { + defer n.RUnlock() return &peerState{n.peerNames(), n.csz, n.extSt} } -// sendPeerState will send our current peer state to the cluster. -// Lock should be held. -func (n *raft) sendPeerState() { - n.sendAppendEntryLocked([]*Entry{{EntryPeerState, encodePeerState(n.currentPeerStateLocked())}}, true) -} - // Send a heartbeat. func (n *raft) sendHeartbeat() { + // OK to call sendAppendEntry() directly here. + // No need to push heardbeats into prop queue + // because we don't store those into the log. n.sendAppendEntry(nil) } diff --git a/server/raft_test.go b/server/raft_test.go index ab704357e5c..dd70c859c4e 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1096,7 +1096,7 @@ func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { ae := n.buildAppendEntry(entries) ae.buf, err = ae.encode(scratch[:]) require_NoError(t, err) - err = n.storeToWAL(ae) + _, _, err = n.storeToWAL(ae) n.Unlock() require_NoError(t, err) @@ -1620,7 +1620,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { require_Equal(t, n.wal.State().Msgs, 0) // Store a third message, it stays uncommitted. - require_NoError(t, n.storeToWAL(aeMsg3)) + _, _, err = n.storeToWAL(aeMsg3) + require_NoError(t, err) require_Equal(t, n.commit, 2) require_Equal(t, n.wal.State().Msgs, 1) entry, err = n.loadEntry(3) @@ -3070,13 +3071,15 @@ func TestNRGSizeAndApplied(t *testing.T) { require_Equal(t, bytes, 0) // Store the first append entry. - require_NoError(t, n.storeToWAL(aeMsg1)) + _, _, err := n.storeToWAL(aeMsg1) + require_NoError(t, err) entries, bytes = n.Size() require_Equal(t, entries, 1) require_Equal(t, bytes, 105) // Store the second append entry. - require_NoError(t, n.storeToWAL(aeMsg2)) + _, _, err = n.storeToWAL(aeMsg2) + require_NoError(t, err) entries, bytes = n.Size() require_Equal(t, entries, 2) require_Equal(t, bytes, 210) diff --git a/server/store.go b/server/store.go index 358d7ec2900..4d4bfae0d72 100644 --- a/server/store.go +++ b/server/store.go @@ -91,6 +91,7 @@ type ProcessJetStreamMsgHandler func(*inMsg) type StreamStore interface { StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error) StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64, ttl int64) error + MsgSize(msg []byte) uint64 SkipMsg() uint64 SkipMsgs(seq uint64, num uint64) error FlushAllPending() diff --git a/server/stream.go b/server/stream.go index 7dbc7d2b4e3..759182a20e3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -888,6 +888,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.Compression = config.Compression // Async flushing is only allowed if the stream has a sync log backing it. fsCfg.AsyncFlush = !fsCfg.SyncAlways && config.Replicas > 1 + fsCfg.SyncAlways = false // Async persist mode opts in to async flushing, // sync always would also be disabled if it was configured.