diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 42c2b1b2ef2..36f4e0bf408 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -198,6 +198,10 @@ const ( JSApiStreamRemovePeer = "$JS.API.STREAM.PEER.REMOVE.*" JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s" + // JSApiStreamLeaderQuiesce is the endpoint to have stream leader quiesce. + // Will return JSON response. + JSApiStreamLeaderQuiesce = "$JS.API.STREAM.LEADER.QUIESCE.*" + // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown. // Will return JSON response. JSApiStreamLeaderStepDown = "$JS.API.STREAM.LEADER.STEPDOWN.*" @@ -612,6 +616,14 @@ type JSApiStreamRemovePeerResponse struct { const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response" +// JSApiStreamLeaderQuiesceResponse is the response to a leader stepdown request. +type JSApiStreamLeaderQuiesceResponse struct { + ApiResponse + Success bool `json:"success,omitempty"` +} + +const JSApiStreamLeaderQuiesceResponseType = "io.nats.jetstream.api.v1.stream_leader_quiesce_response" + // JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request. type JSApiStreamLeaderStepDownResponse struct { ApiResponse @@ -1006,6 +1018,7 @@ func (s *Server) setJetStreamExportSubs() error { {JSApiStreamSnapshot, s.jsStreamSnapshotRequest}, {JSApiStreamRestore, s.jsStreamRestoreRequest}, {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest}, + {JSApiStreamLeaderQuiesce, s.jsStreamLeaderQuiesceRequest}, {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest}, {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest}, {JSApiMsgDelete, s.jsMsgDeleteRequest}, @@ -2267,6 +2280,93 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } +// Request a stream leader to quiesce. +func (s *Server) jsStreamLeaderQuiesceRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { + if c == nil || !s.JetStreamEnabled() { + return + } + + ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg) + if err != nil { + s.Warnf(badAPIRequestT, msg) + return + } + + // Have extra token for this one. + name := tokenAt(subject, 6) + + var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderQuiesceResponseType}} + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if js.isLeaderless() { + resp.Error = NewJSClusterNotAvailError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + js.mu.RLock() + isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name) + js.mu.RUnlock() + + if isLeader && sa == nil { + resp.Error = NewJSStreamNotFoundError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } else if sa == nil { + return + } + + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { + resp.Error = NewJSNotEnabledForAccountError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } + return + } + + // Check to see if we are a member of the group and if the group has no leader. + if js.isGroupLeaderless(sa.Group) { + resp.Error = NewJSClusterNotAvailError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // We have the stream assigned and a leader, so only the stream leader should answer. + if !acc.JetStreamIsStreamLeader(name) { + return + } + + mset, err := acc.lookupStream(name) + if err != nil || mset == nil { + resp.Error = NewJSStreamNotFoundError(Unless(err)) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + node := mset.raftNode() + if node == nil { + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + return + } + + err = node.Quiesce() + if err != nil { + resp.Error = NewJSRaftGeneralError(err, Unless(err)) + } else { + resp.Success = true + } + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) +} + // Request to have a stream leader stepdown. func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { diff --git a/server/raft.go b/server/raft.go index e8e8d313f54..abb72c98f9c 100644 --- a/server/raft.go +++ b/server/raft.go @@ -82,6 +82,7 @@ type RaftNode interface { Delete() RecreateInternalSubs() error IsSystemAccount() bool + Quiesce() error } type WAL interface { @@ -225,6 +226,9 @@ type raft struct { observer bool // The node is observing, i.e. not able to become leader initializing bool // The node is new, and "empty log" checks can be temporarily relaxed. scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data. + + quiesce chan bool // Channel to notify leader loop to quiesc + quiesced bool // The node is quiesced } type proposedEntry struct { @@ -260,6 +264,7 @@ const ( lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds observerModeIntervalDefault = 48 * time.Hour peerRemoveTimeoutDefault = 5 * time.Minute + quiesceIntervalDefault = 15 * time.Minute ) var ( @@ -272,6 +277,7 @@ var ( lostQuorumCheck = lostQuorumCheckIntervalDefault observerModeInterval = observerModeIntervalDefault peerRemoveTimeout = peerRemoveTimeoutDefault + quiesceInterval = quiesceIntervalDefault ) type RaftConfig struct { @@ -426,6 +432,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel leadc: make(chan bool, 32), observer: cfg.Observer, extSt: ps.domainExt, + quiesce: make(chan bool), } // Setup our internal subscriptions for proposals, votes and append entries. @@ -1601,6 +1608,44 @@ func (n *raft) selectNextLeader() string { return nextLeader } +func (n *raft) Quiesce() error { + if n.State() != Leader { + return errNotLeader + } + n.quiesce <- true + return nil +} + +// Return true if the node can be quiesced +func (n *raft) mayQuiesce() bool { + n.RLock() + defer n.RUnlock() + // TODO this test should be strengthened: + // must check that followers are up-to-date + return !n.quiesced && n.State() == Leader && n.hasQuorumLocked() +} + +func (n *raft) doQuiesce() bool { + if n.mayQuiesce() { + n.sendQuiesce() + n.setQuiesced(true) + return true + } + return false +} + +func (n *raft) isQuiesced() bool { + n.RLock() + defer n.RUnlock() + return n.quiesced +} + +func (n *raft) setQuiesced(quiesced bool) { + n.Lock() + defer n.Unlock() + n.quiesced = quiesced +} + // StepDown will have a leader stepdown and optionally do a leader transfer. func (n *raft) StepDown(preferred ...string) error { if n.State() != Leader { @@ -2140,8 +2185,13 @@ func (n *raft) runAsFollower() { select { case <-n.entry.ch: + wasQuiesced := n.isQuiesced() // New append entries have arrived over the network. n.processAppendEntries() + if !wasQuiesced && n.isQuiesced() { + // Avoid unquiescing immediately + continue + } case <-n.s.quitCh: // The server is shutting down. return @@ -2188,6 +2238,11 @@ func (n *raft) runAsFollower() { n.processVoteRequest(voteReq) } } + + if n.isQuiesced() { + n.setQuiesced(false) + n.debug("Follower unquiesced") + } } } @@ -2308,6 +2363,7 @@ const ( EntryRemovePeer EntryLeaderTransfer EntrySnapshot + EntryQuiesce ) func (t EntryType) String() string { @@ -2326,6 +2382,8 @@ func (t EntryType) String() string { return "LeaderTransfer" case EntrySnapshot: return "Snapshot" + case EntryQuiesce: + return "Quiesce" } return fmt.Sprintf("Unknown [%d]", uint8(t)) } @@ -2585,10 +2643,15 @@ func (n *raft) runAsLeader() { n.sendPeerState() hb := time.NewTicker(hbInterval) - defer hb.Stop() - lq := time.NewTicker(lostQuorumCheck) - defer lq.Stop() + qu := time.NewTicker(quiesceInterval) + + stopTicking := func() { + hb.Stop() + lq.Stop() + qu.Stop() + } + defer stopTicking() for n.State() == Leader { select { @@ -2602,6 +2665,12 @@ func (n *raft) runAsLeader() { n.processAppendEntryResponse(ar) } n.resp.recycle(&ars) + // TODO follower could avoid sending a response + // for EntryQuiesce + if n.isQuiesced() { + // Avoid unquiescing immediately + continue + } case <-n.prop.ch: const maxBatch = 256 * 1024 const maxEntries = 512 @@ -2664,15 +2733,31 @@ func (n *raft) runAsLeader() { } case <-n.entry.ch: n.processAppendEntries() + case <-qu.C: + if time.Since(n.active) > quiesceInterval && n.doQuiesce() { + stopTicking() + continue + } + case <-n.quiesce: + if n.doQuiesce() { + stopTicking() + continue + } + } + + // Any interaction unquiesces the leader + if n.isQuiesced() { + hb.Reset(hbInterval) + lq.Reset(lostQuorumInterval) + qu.Reset(quiesceInterval) + n.setQuiesced(false) + n.debug("Leader unquiesced") } } } -// Quorum reports the quorum status. Will be called on former leaders. -func (n *raft) Quorum() bool { - n.RLock() - defer n.RUnlock() - +// Return true if leader believes it still has a quorum. +func (n *raft) hasQuorumLocked() bool { nc := 0 for id, peer := range n.peers { if id == n.id || time.Since(peer.ts) < lostQuorumInterval { @@ -2684,6 +2769,13 @@ func (n *raft) Quorum() bool { return false } +// Quorum reports the quorum status. Will be called on former leaders. +func (n *raft) Quorum() bool { + n.RLock() + defer n.RUnlock() + return n.hasQuorumLocked() +} + func (n *raft) lostQuorum() bool { n.RLock() defer n.RUnlock() @@ -2698,15 +2790,7 @@ func (n *raft) lostQuorumLocked() bool { return false } - nc := 0 - for id, peer := range n.peers { - if id == n.id || time.Since(peer.ts) < lostQuorumInterval { - if nc++; nc >= n.qn { - return false - } - } - } - return true + return !n.hasQuorumLocked() } // Check for being not active in terms of sending entries. @@ -3719,6 +3803,11 @@ CONTINUE: // Check to see if we have any related entries to process here. for _, e := range ae.entries { switch e.Type { + case EntryQuiesce: + if isNew && n.State() == Follower { + n.elect.Stop() + n.quiesced = true + } case EntryLeaderTransfer: // Only process these if they are new, so no replays or catchups. if isNew { @@ -3870,6 +3959,11 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { // Determine if we should store an entry. This stops us from storing // heartbeat messages. func (ae *appendEntry) shouldStore() bool { + if len(ae.entries) == 1 { + if e := ae.entries[0]; e.Type == EntryQuiesce { + return false + } + } return ae != nil && len(ae.entries) > 0 } @@ -4033,6 +4127,11 @@ func (n *raft) sendHeartbeat() { n.sendAppendEntry(nil) } +// Tell the cluster to quiesce the current term +func (n *raft) sendQuiesce() { + n.sendAppendEntry([]*Entry{{EntryQuiesce, nil}}) +} + type voteRequest struct { term uint64 lastTerm uint64 diff --git a/server/raft_test.go b/server/raft_test.go index 8fdf93b7944..f13359d0e81 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3532,3 +3532,36 @@ func TestNRGChainOfBlocksStopAndCatchUp(t *testing.T) { } } } + +// TestNRGLeaderQuiesce verifies that a Raft group can correctly quiesce. +func TestNRGQuiesceSimple(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createMemRaftGroup("Test", 3, newStateAdder) + rg.waitOnLeader() + + leader := rg.leader() + follower := rg.nonLeader() + + err := leader.node().Quiesce() + require_NoError(t, err) + + // TODO this should be avoided + time.Sleep(time.Second) + + leader_raft := leader.node().(*raft) + require_True(t, leader_raft.isQuiesced()) + + follower_raft := follower.node().(*raft) + require_True(t, follower_raft.isQuiesced()) + + leader.(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 1) + + require_False(t, leader_raft.isQuiesced()) + require_False(t, follower_raft.isQuiesced()) +}