diff --git a/server/errors.json b/server/errors.json index 410544bdaa2..7e133b37594 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1998,5 +1998,65 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSBatchPublishDisabledErr", + "code": 400, + "error_code": 10202, + "description": "batch publish is disabled", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSBatchPublishMissingSeqErr", + "code": 400, + "error_code": 10203, + "description": "batch publish sequence is missing", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSBatchPublishInvalidBatchIDErr", + "code": 400, + "error_code": 10204, + "description": "batch publish ID is invalid", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSBatchPublishUnknownBatchIDErr", + "code": 400, + "error_code": 10205, + "description": "batch publish ID unknown", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSBatchPublishInvalidGapModeErr", + "code": 400, + "error_code": 10206, + "description": "batch publish gap mode is invalid", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSMirrorWithBatchPublishErr", + "code": 400, + "error_code": 10207, + "description": "stream mirrors can not also use batch publishing", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/jetstream.go b/server/jetstream.go index 1565243a12d..87e35ab7db2 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1458,6 +1458,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro smv StoreMsg batchId string batchSeq uint64 + atomic bool commit bool commitEob bool batchStoreDir string @@ -1469,10 +1470,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro if err != nil || sm == nil { goto SKIP } - batchId = getBatchId(sm.hdr) + batchId, atomic = getBatchId(sm.hdr) batchSeq, ok = getBatchSequence(sm.hdr) commit = len(sliceHeader(JSBatchCommit, sm.hdr)) != 0 - if batchId == _EMPTY_ || !ok || commit { + if batchId == _EMPTY_ || !atomic || !ok || commit { goto SKIP } // We've observed a partial batch write. Write the remainder of the batch. diff --git a/server/jetstream_batching.go b/server/jetstream_batching.go index fc97f1f5453..463ea2e9eb0 100644 --- a/server/jetstream_batching.go +++ b/server/jetstream_batching.go @@ -21,6 +21,7 @@ import ( "math/big" "path/filepath" "slices" + "strconv" "strings" "sync" "sync/atomic" @@ -39,30 +40,37 @@ type batching struct { type batchGroup struct { lseq uint64 - store StreamStore timer *time.Timer + // Used for atomic batch publish. + store StreamStore // Where the batch is staged before committing. + // Used for fast batch publish. + pseq uint64 // Last persisted batch sequence. + sseq uint64 // Last persisted stream sequence. + ackMessages uint64 // Ack will be sent every N messages. + gapOk bool // Whether a gap is okay, if not the batch would be rejected. + reply string // If the batch is committed, this is the reply subject used for the PubAck. } +// newAtomicBatchGroup creates an atomic batch publish group. // Lock should be held. -func (batches *batching) newBatchGroup(mset *stream, batchId string) (*batchGroup, error) { +func (batches *batching) newAtomicBatchGroup(mset *stream, batchId string) (*batchGroup, error) { store, err := newBatchStore(mset, batchId) if err != nil { return nil, err } b := &batchGroup{store: store} - - // Create a timer to clean up after timeout. - timeout := streamMaxBatchTimeout - if maxBatchTimeout := mset.srv.getOpts().JetStreamLimits.MaxBatchTimeout; maxBatchTimeout > 0 { - timeout = maxBatchTimeout - } - b.timer = time.AfterFunc(timeout, func() { - b.cleanup(batchId, batches) - mset.sendStreamBatchAbandonedAdvisory(batchId, BatchTimeout) - }) + b.setupCleanupTimer(mset, batchId, batches) return b, nil } +// newFastBatchGroup creates a fast batch publish group. +// Lock should be held. +func (batches *batching) newFastBatchGroup(mset *stream, batchId string, gapOk bool, ackMessages uint64) *batchGroup { + b := &batchGroup{gapOk: gapOk, ackMessages: ackMessages} + b.setupCleanupTimer(mset, batchId, batches) + return b +} + func getBatchStoreDir(mset *stream, batchId string) (string, string) { mset.mu.RLock() jsa, name := mset.jsa, mset.cfg.Name @@ -105,10 +113,87 @@ func (b *batchGroup) readyForCommit() bool { if !b.timer.Stop() { return false } - b.store.FlushAllPending() + if b.store != nil { + b.store.FlushAllPending() + } return true } +// fastBatchRegisterSequences registers the highest stored batch and stream sequence and returns +// whether a PubAck should be sent if the batch has been committed. +// Lock should be held. +func (batches *batching) fastBatchRegisterSequences(batchId string, batchSeq, streamSeq uint64) (*batchGroup, string) { + if b, ok := batches.group[batchId]; ok { + b.sseq = streamSeq + b.pseq = batchSeq + // If the PubAck needs to be sent now as a result of a commit. + // Return the reply and clean up the batch now. + if b.lseq == batchSeq && b.reply != _EMPTY_ { + b.cleanupLocked(batchId, batches) + return b, b.reply + } + return b, _EMPTY_ + } + return nil, _EMPTY_ +} + +// fastBatchFlowControl sends a fast batch flow control message for the current highest sequence. +// Lock should be held. +func (b *batchGroup) fastBatchFlowControl(batchSeq uint64, mset *stream, reply string) { + if len(reply) == 0 { + return + } + response, _ := json.Marshal(&BatchFlowAck{AckMessages: b.ackMessages, CurrentSequence: batchSeq}) + mset.outq.sendMsg(reply, response) +} + +// fastBatchCommit ends the batch and commits the data up to that point. If all messages +// have already been persisted, a PubAck is sent immediately. Otherwise, it will be sent +// after the last message has been persisted. +// Lock should be held. +func (batches *batching) fastBatchCommit(b *batchGroup, batchId string, mset *stream, reply string) { + // If the whole batch has been persisted, we can respond with the PubAck now. + if b.lseq == b.pseq { + b.cleanupLocked(batchId, batches) + var buf [256]byte + pubAck := append(buf[:0], mset.pubAck...) + response := append(pubAck, strconv.FormatUint(b.sseq, 10)...) + response = append(response, fmt.Sprintf(",\"batch\":%q,\"count\":%d}", batchId, b.lseq)...) + if len(reply) > 0 { + mset.outq.sendMsg(reply, response) + } + return + } + // Otherwise, we need to wait and the PubAck will be sent when the last message is persisted. + // The batch will be cleaned up then, so stop the timer. + b.timer.Stop() + // And, need to store the reply for later for the PubAck. + b.reply = reply +} + +// setupCleanupTimer sets up a timer to clean up the batch after a timeout. +func (b *batchGroup) setupCleanupTimer(mset *stream, batchId string, batches *batching) { + // Create a timer to clean up after timeout. + timeout := streamMaxBatchTimeout + if maxBatchTimeout := mset.srv.getOpts().JetStreamLimits.MaxBatchTimeout; maxBatchTimeout > 0 { + timeout = maxBatchTimeout + } + b.timer = time.AfterFunc(timeout, func() { + b.cleanup(batchId, batches) + mset.sendStreamBatchAbandonedAdvisory(batchId, BatchTimeout) + }) +} + +// resetCleanupTimer resets the cleanup timer, allowing to extend the lifetime of the batch. +// Returns whether the timer was reset without it having expired before. +func (b *batchGroup) resetCleanupTimer(mset *stream) bool { + timeout := streamMaxBatchTimeout + if maxBatchTimeout := mset.srv.getOpts().JetStreamLimits.MaxBatchTimeout; maxBatchTimeout > 0 { + timeout = maxBatchTimeout + } + return b.timer.Reset(timeout) +} + // cleanup deletes underlying resources associated with the batch and unregisters it from the stream's batches. func (b *batchGroup) cleanup(batchId string, batches *batching) { batches.mu.Lock() @@ -120,7 +205,9 @@ func (b *batchGroup) cleanup(batchId string, batches *batching) { func (b *batchGroup) cleanupLocked(batchId string, batches *batching) { globalInflightBatches.Add(-1) b.timer.Stop() - b.store.Delete(true) + if b.store != nil { + b.store.Delete(true) + } delete(batches.group, batchId) } @@ -128,7 +215,9 @@ func (b *batchGroup) cleanupLocked(batchId string, batches *batching) { func (b *batchGroup) stopLocked() { globalInflightBatches.Add(-1) b.timer.Stop() - b.store.Stop() + if b.store != nil { + b.store.Stop() + } } // batchStagedDiff stages all changes for consistency checks until commit. @@ -651,3 +740,61 @@ func checkMsgHeadersPreClusteredProposal( return hdr, msg, 0, nil, nil } + +// recalculateClusteredSeq initializes or updates mset.clseq, for example after a leader change. +// This is reused for normal clustered publishing into a stream, and for atomic and fast batch publishing. +// mset.clMu lock must be held. +func recalculateClusteredSeq(mset *stream) (lseq uint64) { + // Need to unlock and re-acquire the locks in the proper order. + mset.clMu.Unlock() + // Locking order is stream -> batchMu -> clMu + mset.mu.RLock() + batch := mset.batchApply + var batchCount uint64 + if batch != nil { + batch.mu.Lock() + batchCount = batch.count + } + mset.clMu.Lock() + // Re-capture + lseq = mset.lseq + mset.clseq = lseq + mset.clfs + batchCount + // Keep hold of the mset.clMu, but unlock the others. + if batch != nil { + batch.mu.Unlock() + } + mset.mu.RUnlock() + return lseq +} + +// commitSingleMsg commits and proposes a single message to the node. +// This is reused both for normal publishing into a stream, and for fast batch publishing. +// mset.clMu lock must be held. +func commitSingleMsg( + diff *batchStagedDiff, mset *stream, subject string, reply string, hdr []byte, msg []byte, name string, + jsa *jsAccount, mt *msgTrace, node RaftNode, replicas int, lseq uint64, +) { + diff.commit(mset) + esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), false) + var mtKey uint64 + if mt != nil { + mtKey = mset.clseq + if mset.mt == nil { + mset.mt = make(map[uint64]*msgTrace) + } + mset.mt[mtKey] = mt + } + + // Do proposal. + _ = node.Propose(esm) + // The proposal can fail, but we always account for trying. + mset.clseq++ + mset.trackReplicationTraffic(node, len(esm), replicas) + + // Check to see if we are being overrun. + // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. + if mset.clseq-(lseq+mset.clfs) > streamLagWarnThreshold { + lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name) + mset.srv.RateLimitWarnf("%s", lerr.Error()) + } +} diff --git a/server/jetstream_batching_test.go b/server/jetstream_batching_test.go index c4ab4145a9a..32bd52c66d5 100644 --- a/server/jetstream_batching_test.go +++ b/server/jetstream_batching_test.go @@ -575,10 +575,12 @@ func TestJetStreamAtomicBatchPublishSourceAndMirror(t *testing.T) { }) require_NoError(t, err) - _, err = js.AddStream(&nats.StreamConfig{ - Name: "S", - Sources: []*nats.StreamSource{{Name: "TEST"}}, - Replicas: replicas, + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "S", + Storage: FileStorage, + Sources: []*StreamSource{{Name: "TEST"}}, + Replicas: replicas, + AllowAtomicPublish: true, }) require_NoError(t, err) @@ -1551,7 +1553,7 @@ func TestJetStreamAtomicBatchPublishSingleServerRecovery(t *testing.T) { mset.batches = batches mset.mu.Unlock() batches.mu.Lock() - b, err := batches.newBatchGroup(mset, "uuid") + b, err := batches.newAtomicBatchGroup(mset, "uuid") if err != nil { batches.mu.Unlock() require_NoError(t, err) @@ -1633,7 +1635,7 @@ func TestJetStreamAtomicBatchPublishSingleServerRecoveryCommitEob(t *testing.T) mset.batches = batches mset.mu.Unlock() batches.mu.Lock() - b, err := batches.newBatchGroup(mset, "uuid") + b, err := batches.newAtomicBatchGroup(mset, "uuid") if err != nil { batches.mu.Unlock() require_NoError(t, err) @@ -2872,3 +2874,475 @@ func TestJetStreamAtomicBatchPublishCommitUnsupported(t *testing.T) { require_NoError(t, err) require_Len(t, len(sliceHeader(JSRequiredApiLevel, sm.Header)), 0) } + +func TestJetStreamFastBatchPublish(t *testing.T) { + test := func( + t *testing.T, + storage StorageType, + retention RetentionPolicy, + replicas int, + ) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + var batchFlowAck BatchFlowAck + var pubAck JSPubAckResponse + + cfg := &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Storage: storage, + Retention: retention, + Replicas: replicas, + } + + _, err := jsStreamCreate(t, nc, cfg) + require_NoError(t, err) + + reply := nats.NewInbox() + sub, err := nc.SubscribeSync(reply) + require_NoError(t, err) + defer sub.Drain() + + m := nats.NewMsg("foo.0") + m.Reply = reply + m.Data = []byte("foo.0") + m.Header.Set("Nats-Fast-Batch-Id", "uuid") + + // Publish with batch publish disabled. + require_NoError(t, nc.PublishMsg(m)) + rmsg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_NotNil(t, pubAck.Error) + require_Error(t, pubAck.Error, NewJSBatchPublishDisabledError()) + + // Enable batch publish. + cfg.AllowBatchPublish = true + _, err = jsStreamUpdate(t, nc, cfg) + require_NoError(t, err) + + // Publish without batch sequence errors. + require_NoError(t, nc.PublishMsg(m)) + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Error(t, pubAck.Error, NewJSBatchPublishMissingSeqError()) + + // A batch ID must not exceed the maximum length. + longBatchId := strings.Repeat("A", 65) + m.Header.Set("Nats-Fast-Batch-Id", longBatchId) + require_NoError(t, nc.PublishMsg(m)) + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_NotNil(t, pubAck.Error) + require_Error(t, pubAck.Error, NewJSBatchPublishInvalidBatchIDError()) + + // Publish a batch, misses start. + m.Header.Set("Nats-Fast-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "2") + require_NoError(t, nc.PublishMsg(m)) + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Error(t, pubAck.Error, NewJSBatchPublishUnknownBatchIDError()) + + // Publish a "batch" which immediately commits. + m.Header.Set("Nats-Fast-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + m.Header.Set("Nats-Batch-Commit", "1") + require_NoError(t, nc.PublishMsg(m)) + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + batchFlowAck = BatchFlowAck{} + require_NoError(t, json.Unmarshal(rmsg.Data, &batchFlowAck)) + require_Equal(t, batchFlowAck.AckMessages, 10) + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Equal(t, pubAck.Sequence, 1) + require_Equal(t, pubAck.BatchId, "uuid") + require_Equal(t, pubAck.BatchSize, 1) + + // Publish a batch of N messages. + m.Header.Del("Nats-Batch-Commit") + for seq, batch := uint64(1), uint64(5); seq <= batch; seq++ { + m.Subject = fmt.Sprintf("foo.%d", seq) + m.Data = []byte(m.Subject) + m.Header.Set("Nats-Batch-Sequence", strconv.FormatUint(seq, 10)) + if seq == batch { + m.Header.Set("Nats-Batch-Commit", "1") + } + require_NoError(t, nc.PublishMsg(m)) + + // Can already pre-check receiving the first flow control message. + if seq == 1 { + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + batchFlowAck = BatchFlowAck{} + require_NoError(t, json.Unmarshal(rmsg.Data, &batchFlowAck)) + require_Equal(t, batchFlowAck.AckMessages, 10) + } + } + // Should receive the PubAck upon commit. + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Equal(t, pubAck.Sequence, 6) + require_Equal(t, pubAck.BatchId, "uuid") + require_Equal(t, pubAck.BatchSize, 5) + + // Validate stream contents. + if retention != InterestPolicy { + for i := 0; i < 6; i++ { + rsm, err := js.GetMsg("TEST", uint64(i+1)) + require_NoError(t, err) + subj := fmt.Sprintf("foo.%d", i) + require_Equal(t, rsm.Subject, subj) + require_Equal(t, string(rsm.Data), subj) + } + } + } + + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + for _, retention := range []RetentionPolicy{LimitsPolicy, InterestPolicy, WorkQueuePolicy} { + for _, replicas := range []int{1, 3} { + t.Run(fmt.Sprintf("%s/%s/R%d", storage, retention, replicas), func(t *testing.T) { + test(t, storage, retention, replicas) + }) + } + } + } +} + +func TestJetStreamFastBatchPublishGapDetection(t *testing.T) { + test := func( + t *testing.T, + storage StorageType, + retention RetentionPolicy, + replicas int, + gapMode string, + ) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc := clientConnectToServer(t, c.randomServer()) + defer nc.Close() + + var batchFlowAck BatchFlowAck + var pubAck JSPubAckResponse + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storage, + Retention: retention, + Replicas: replicas, + AllowBatchPublish: true, + }) + require_NoError(t, err) + + reply := nats.NewInbox() + sub, err := nc.SubscribeSync(reply) + require_NoError(t, err) + defer sub.Drain() + + m := nats.NewMsg("foo") + m.Reply = reply + m.Header.Set("Nats-Fast-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + if gapMode != _EMPTY_ { + m.Header.Set("Nats-Batch-Gap", gapMode) + } + require_NoError(t, nc.PublishMsg(m)) + m.Header.Del("Nats-Batch-Gap") + rmsg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + + if gapMode == "unknown" { + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_NotNil(t, pubAck.Error) + require_Error(t, pubAck.Error, NewJSBatchPublishInvalidGapModeError()) + return + } + + batchFlowAck = BatchFlowAck{} + require_NoError(t, json.Unmarshal(rmsg.Data, &batchFlowAck)) + require_Equal(t, batchFlowAck.AckMessages, 10) + + // Now a message is missed and a gap should be detected. + m.Header.Set("Nats-Batch-Sequence", "3") + require_NoError(t, nc.PublishMsg(m)) + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + + // There will always be a flow control message with the missed sequences. + batchFlowAck = BatchFlowAck{} + require_NoError(t, json.Unmarshal(rmsg.Data, &batchFlowAck)) + require_Equal(t, batchFlowAck.CurrentSequence, 3) + require_Equal(t, batchFlowAck.LastSequence, 1) + // Should NOT contain ack information, as these could come in out-of-order. + require_Equal(t, batchFlowAck.AckMessages, 0) + + switch gapMode { + case _EMPTY_, JSFastBatchGapFail: + // By default, if a gap is detected, the batch is rejected. + // A PubAck is returned with the data that has been persisted up to that point. + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Equal(t, pubAck.Sequence, 1) + require_Equal(t, pubAck.BatchId, "uuid") + require_Equal(t, pubAck.BatchSize, 1) + case JSFastBatchGapOk: + // If a gap is ok, the batch will continue to function. + // An EOB commit should get us the PubAck for the third message. + m.Header.Set("Nats-Batch-Sequence", "4") + m.Header.Set("Nats-Batch-Commit", "eob") + require_NoError(t, nc.PublishMsg(m)) + rmsg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Equal(t, pubAck.Sequence, 2) + require_Equal(t, pubAck.BatchId, "uuid") + require_Equal(t, pubAck.BatchSize, 3) + default: + t.Fatalf("unexpected gap mode: %q", gapMode) + } + } + + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + for _, retention := range []RetentionPolicy{LimitsPolicy, InterestPolicy, WorkQueuePolicy} { + for _, replicas := range []int{1, 3} { + for _, gapMode := range []string{_EMPTY_, "fail", "ok", "unknown"} { + t.Run(fmt.Sprintf("%s/%s/R%d/%s", storage, retention, replicas, gapMode), func(t *testing.T) { + test(t, storage, retention, replicas, gapMode) + }) + } + } + } + } +} + +func TestJetStreamFastBatchPublishFlowControl(t *testing.T) { + templ := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + max_mem_store: 2GB + max_file_store: 2GB + store_dir: '%s' + limits { + batch { + timeout: 750ms + } + } + } + + leaf { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + # For access to system account. + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } +` + + test := func( + t *testing.T, + storage StorageType, + replicas int, + ) { + c := createJetStreamClusterWithTemplate(t, templ, "R3S", 3) + defer c.shutdown() + + nc := clientConnectToServer(t, c.randomServer()) + defer nc.Close() + + var batchFlowAck BatchFlowAck + var pubAck JSPubAckResponse + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storage, + Replicas: replicas, + AllowBatchPublish: true, + }) + require_NoError(t, err) + + reply := nats.NewInbox() + sub, err := nc.SubscribeSync(reply) + require_NoError(t, err) + defer sub.Drain() + + m := nats.NewMsg("foo") + m.Reply = reply + m.Header.Set("Nats-Fast-Batch-Id", "uuid") + m.Header.Set("Nats-Flow", "2") + + lseq := uint64(5) + for seq := uint64(1); seq <= lseq; seq++ { + m.Header.Set("Nats-Batch-Sequence", strconv.FormatUint(seq, 10)) + if seq == lseq { + m.Header.Set("Nats-Batch-Commit", "1") + } + require_NoError(t, nc.PublishMsg(m)) + + if seq == 1 || seq%2 == 0 { + rmsg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + batchFlowAck = BatchFlowAck{} + require_NoError(t, json.Unmarshal(rmsg.Data, &batchFlowAck)) + if seq > 1 { + require_Equal(t, batchFlowAck.CurrentSequence, seq) + } + require_Equal(t, batchFlowAck.AckMessages, 2) + } else if seq == lseq { + rmsg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + pubAck = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Equal(t, pubAck.Sequence, 5) + require_Equal(t, pubAck.BatchId, "uuid") + require_Equal(t, pubAck.BatchSize, 5) + } + + // Sleep between messages such that we'll go over the batch timeout. + // New messages being received should receive the timer. + time.Sleep(250 * time.Millisecond) + } + } + + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + for _, replicas := range []int{1, 3} { + t.Run(fmt.Sprintf("%s/R%d", storage, replicas), func(t *testing.T) { + test(t, storage, replicas) + }) + } + } +} + +func TestJetStreamFastBatchPublishSourceAndMirror(t *testing.T) { + test := func(t *testing.T, replicas int) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: FileStorage, + AllowBatchPublish: true, + Replicas: replicas, + }) + require_NoError(t, err) + + for seq := uint64(1); seq <= 3; seq++ { + m := nats.NewMsg("foo") + m.Header.Set("Nats-Fast-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", strconv.FormatUint(seq, 10)) + if seq == 1 { + m.Header.Set("Nats-Flow", "10") + m.Header.Set("Nats-Batch-Gap", "fail") + } + commit := seq == 3 + if !commit { + require_NoError(t, nc.PublishMsg(m)) + continue + } + m.Header.Set("Nats-Batch-Commit", "1") + + rmsg, err := nc.RequestMsg(m, time.Second) + require_NoError(t, err) + var pubAck JSPubAckResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_Equal(t, pubAck.Sequence, 3) + require_Equal(t, pubAck.BatchId, "uuid") + require_Equal(t, pubAck.BatchSize, 3) + } + + require_NoError(t, js.DeleteMsg("TEST", 2)) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + // Mirror can source batched messages but can't do fast batching itself. + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "M-no-batch", + Storage: FileStorage, + Mirror: &StreamSource{Name: "TEST"}, + Replicas: replicas, + AllowBatchPublish: true, + }) + require_Error(t, err, NewJSMirrorWithBatchPublishError()) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{Name: "TEST"}, + Replicas: replicas, + }) + require_NoError(t, err) + + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "S", + Storage: FileStorage, + Sources: []*StreamSource{{Name: "TEST"}}, + Replicas: replicas, + AllowBatchPublish: true, + }) + require_NoError(t, err) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + for _, name := range []string{"M", "S"} { + if si, err := js.StreamInfo(name); err != nil { + return err + } else if si.State.Msgs != 2 { + return fmt.Errorf("expected 2 messages for stream %q, got %d", name, si.State.Msgs) + } + } + return nil + }) + + // Ensure the batching headers were removed when ingested into the source/mirror. + rsm, err := js.GetMsg("M", 1) + require_NoError(t, err) + require_Len(t, len(rsm.Header), 0) + + rsm, err = js.GetMsg("M", 3) + require_NoError(t, err) + require_Len(t, len(rsm.Header), 0) + + rsm, err = js.GetMsg("S", 1) + require_NoError(t, err) + require_Len(t, len(rsm.Header), 1) + require_Equal(t, rsm.Header.Get(JSStreamSource), "TEST 1 > > foo") + + rsm, err = js.GetMsg("S", 2) + require_NoError(t, err) + require_Len(t, len(rsm.Header), 1) + require_Equal(t, rsm.Header.Get(JSStreamSource), "TEST 3 > > foo") + } + + t.Run("R1", func(t *testing.T) { test(t, 1) }) + t.Run("R3", func(t *testing.T) { test(t, 3) }) +} diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 00ba6c5fc03..17d0e58f14b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8857,25 +8857,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Check if we need to set initial value here mset.clMu.Lock() if mset.clseq == 0 || mset.clseq < lseq+mset.clfs { - // Need to unlock and re-acquire the locks in the proper order. - mset.clMu.Unlock() - // Locking order is stream -> batchMu -> clMu - mset.mu.RLock() - batch := mset.batchApply - var batchCount uint64 - if batch != nil { - batch.mu.Lock() - batchCount = batch.count - } - mset.clMu.Lock() - // Re-capture - lseq = mset.lseq - mset.clseq = lseq + mset.clfs + batchCount - // Keep hold of the mset.clMu, but unlock the others. - if batch != nil { - batch.mu.Unlock() - } - mset.mu.RUnlock() + lseq = recalculateClusteredSeq(mset) } var ( @@ -8903,48 +8885,9 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ return err } - diff.commit(mset) - esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), sourced) - var mtKey uint64 - if mt != nil { - mtKey = mset.clseq - if mset.mt == nil { - mset.mt = make(map[uint64]*msgTrace) - } - mset.mt[mtKey] = mt - } - - // Do proposal. - _ = node.Propose(esm) - // The proposal can fail, but we always account for trying. - mset.clseq++ - mset.trackReplicationTraffic(node, len(esm), r) - - // Check to see if we are being overrun. - // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. - if mset.clseq-(lseq+mset.clfs) > streamLagWarnThreshold { - lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name) - s.RateLimitWarnf("%s", lerr.Error()) - } + commitSingleMsg(diff, mset, subject, reply, hdr, msg, name, jsa, mt, node, r, lseq) mset.clMu.Unlock() - - if err != nil { - if mt != nil { - mset.getAndDeleteMsgTrace(mtKey) - } - if canRespond { - var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.cfg.Name}} - resp.Error = &ApiError{Code: 503, Description: err.Error()} - response, _ = json.Marshal(resp) - // If we errored out respond here. - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) - } - if isOutOfSpaceErr(err) { - s.handleOutOfSpace(mset) - } - } - - return err + return nil } func (mset *stream) getAndDeleteMsgTrace(lseq uint64) *msgTrace { diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index d244ebd7ac2..f911bbb7e28 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -35,6 +35,21 @@ const ( // JSBadRequestErr bad request JSBadRequestErr ErrorIdentifier = 10003 + // JSBatchPublishDisabledErr batch publish is disabled + JSBatchPublishDisabledErr ErrorIdentifier = 10202 + + // JSBatchPublishInvalidBatchIDErr batch publish ID is invalid + JSBatchPublishInvalidBatchIDErr ErrorIdentifier = 10204 + + // JSBatchPublishInvalidGapModeErr batch publish gap mode is invalid + JSBatchPublishInvalidGapModeErr ErrorIdentifier = 10206 + + // JSBatchPublishMissingSeqErr batch publish sequence is missing + JSBatchPublishMissingSeqErr ErrorIdentifier = 10203 + + // JSBatchPublishUnknownBatchIDErr batch publish ID unknown + JSBatchPublishUnknownBatchIDErr ErrorIdentifier = 10205 + // JSClusterIncompleteErr incomplete results JSClusterIncompleteErr ErrorIdentifier = 10004 @@ -350,6 +365,9 @@ const ( // JSMirrorWithAtomicPublishErr stream mirrors can not also use atomic publishing JSMirrorWithAtomicPublishErr ErrorIdentifier = 10198 + // JSMirrorWithBatchPublishErr stream mirrors can not also use batch publishing + JSMirrorWithBatchPublishErr ErrorIdentifier = 10207 + // JSMirrorWithCountersErr stream mirrors can not also calculate counters JSMirrorWithCountersErr ErrorIdentifier = 10173 @@ -618,6 +636,11 @@ var ( JSAtomicPublishTooLargeBatchErrF: {Code: 400, ErrCode: 10199, Description: "atomic publish batch is too large: {size}"}, JSAtomicPublishUnsupportedHeaderBatchErr: {Code: 400, ErrCode: 10177, Description: "atomic publish unsupported header used: {header}"}, JSBadRequestErr: {Code: 400, ErrCode: 10003, Description: "bad request"}, + JSBatchPublishDisabledErr: {Code: 400, ErrCode: 10202, Description: "batch publish is disabled"}, + JSBatchPublishInvalidBatchIDErr: {Code: 400, ErrCode: 10204, Description: "batch publish ID is invalid"}, + JSBatchPublishInvalidGapModeErr: {Code: 400, ErrCode: 10206, Description: "batch publish gap mode is invalid"}, + JSBatchPublishMissingSeqErr: {Code: 400, ErrCode: 10203, Description: "batch publish sequence is missing"}, + JSBatchPublishUnknownBatchIDErr: {Code: 400, ErrCode: 10205, Description: "batch publish ID unknown"}, JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"}, JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "{err}"}, JSClusterNotActiveErr: {Code: 500, ErrCode: 10006, Description: "JetStream not in clustered mode"}, @@ -723,6 +746,7 @@ var ( JSMirrorMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10150, Description: "mirror with multiple subject transforms cannot also have a single subject filter"}, JSMirrorOverlappingSubjectFilters: {Code: 400, ErrCode: 10152, Description: "mirror subject filters can not overlap"}, JSMirrorWithAtomicPublishErr: {Code: 400, ErrCode: 10198, Description: "stream mirrors can not also use atomic publishing"}, + JSMirrorWithBatchPublishErr: {Code: 400, ErrCode: 10207, Description: "stream mirrors can not also use batch publishing"}, JSMirrorWithCountersErr: {Code: 400, ErrCode: 10173, Description: "stream mirrors can not also calculate counters"}, JSMirrorWithFirstSeqErr: {Code: 400, ErrCode: 10143, Description: "stream mirrors can not have first sequence configured"}, JSMirrorWithMsgSchedulesErr: {Code: 400, ErrCode: 10186, Description: "stream mirrors can not also schedule messages"}, @@ -945,6 +969,56 @@ func NewJSBadRequestError(opts ...ErrorOption) *ApiError { return ApiErrors[JSBadRequestErr] } +// NewJSBatchPublishDisabledError creates a new JSBatchPublishDisabledErr error: "batch publish is disabled" +func NewJSBatchPublishDisabledError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSBatchPublishDisabledErr] +} + +// NewJSBatchPublishInvalidBatchIDError creates a new JSBatchPublishInvalidBatchIDErr error: "batch publish ID is invalid" +func NewJSBatchPublishInvalidBatchIDError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSBatchPublishInvalidBatchIDErr] +} + +// NewJSBatchPublishInvalidGapModeError creates a new JSBatchPublishInvalidGapModeErr error: "batch publish gap mode is invalid" +func NewJSBatchPublishInvalidGapModeError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSBatchPublishInvalidGapModeErr] +} + +// NewJSBatchPublishMissingSeqError creates a new JSBatchPublishMissingSeqErr error: "batch publish sequence is missing" +func NewJSBatchPublishMissingSeqError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSBatchPublishMissingSeqErr] +} + +// NewJSBatchPublishUnknownBatchIDError creates a new JSBatchPublishUnknownBatchIDErr error: "batch publish ID unknown" +func NewJSBatchPublishUnknownBatchIDError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSBatchPublishUnknownBatchIDErr] +} + // NewJSClusterIncompleteError creates a new JSClusterIncompleteErr error: "incomplete results" func NewJSClusterIncompleteError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -2091,6 +2165,16 @@ func NewJSMirrorWithAtomicPublishError(opts ...ErrorOption) *ApiError { return ApiErrors[JSMirrorWithAtomicPublishErr] } +// NewJSMirrorWithBatchPublishError creates a new JSMirrorWithBatchPublishErr error: "stream mirrors can not also use batch publishing" +func NewJSMirrorWithBatchPublishError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSMirrorWithBatchPublishErr] +} + // NewJSMirrorWithCountersError creates a new JSMirrorWithCountersErr error: "stream mirrors can not also calculate counters" func NewJSMirrorWithCountersError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 8d6e9e7cab0..0aed2c198ad 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -17,7 +17,7 @@ import "strconv" const ( // JSApiLevel is the maximum supported JetStream API level for this server. - JSApiLevel int = 2 + JSApiLevel int = 3 JSRequiredLevelMetadataKey = "_nats.req.level" JSServerVersionMetadataKey = "_nats.ver" @@ -82,6 +82,11 @@ func setStaticStreamMetadata(cfg *StreamConfig) { requires(2) } + // Fast batch publishing was added in v2.14 and requires API level 3. + if cfg.AllowBatchPublish { + requires(3) + } + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) } diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index c98a1cc6ec3..2bca4ee0151 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -106,6 +106,11 @@ func TestJetStreamSetStaticStreamMetadata(t *testing.T) { cfg: &StreamConfig{PersistMode: AsyncPersistMode}, expectedMetadata: metadataAtLevel("2"), }, + { + desc: "AllowBatchPublish", + cfg: &StreamConfig{AllowBatchPublish: true}, + expectedMetadata: metadataAtLevel("3"), + }, } { t.Run(test.desc, func(t *testing.T) { setStaticStreamMetadata(test.cfg) diff --git a/server/stream.go b/server/stream.go index c4934230992..7d1dcc43645 100644 --- a/server/stream.go +++ b/server/stream.go @@ -122,6 +122,9 @@ type StreamConfig struct { // PersistMode allows to opt-in to different persistence mode settings. PersistMode PersistModeType `json:"persist_mode,omitempty"` + // AllowBatchPublish allows fast batch publishing into the stream. + AllowBatchPublish bool `json:"allow_batched,omitempty"` + // Metadata is additional metadata for the Stream. Metadata map[string]string `json:"metadata,omitempty"` } @@ -274,6 +277,18 @@ type CounterValue struct { // e.g. {"stream":{"subject":"123"}} type CounterSources map[string]map[string]string +// BatchFlowAck is used for flow control when fast batch publishing into a stream. +type BatchFlowAck struct { + // LastSequence is the previously highest sequence seen, this is set when a gap is detected + LastSequence uint64 `json:"last_seq,omitempty"` + // CurrentSequence is the sequence of the message that triggered the ack + CurrentSequence uint64 `json:"seq,omitempty"` + // AckMessages indicates the active per-message frequency of Flow Acks + AckMessages uint64 `json:"messages,omitempty"` + //// AckBytes indicates the active per-bytes frequency of Flow Acks in unit of bytes + //AckBytes int64 `json:"bytes,omitempty"` +} + // StreamInfo shows config and current state for this stream. type StreamInfo struct { Config StreamConfig `json:"config"` @@ -560,6 +575,9 @@ const ( JSBatchId = "Nats-Batch-Id" JSBatchSeq = "Nats-Batch-Sequence" JSBatchCommit = "Nats-Batch-Commit" + JSFastBatchId = "Nats-Fast-Batch-Id" + JSBatchGap = "Nats-Batch-Gap" + JSFlow = "Nats-Flow" JSSchedulePattern = "Nats-Schedule" JSScheduleTTL = "Nats-Schedule-TTL" JSScheduleTarget = "Nats-Schedule-Target" @@ -578,6 +596,12 @@ const ( JSScheduleNextPurge = "purge" // If it's a non-repeating/delayed message, the schedule is purged. ) +// Header values for fast batch publish. +const ( + JSFastBatchGapFail = "fail" + JSFastBatchGapOk = "ok" +) + // Headers for republished messages and direct get responses. const ( JSStream = "Nats-Stream" @@ -1727,6 +1751,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo if cfg.AllowAtomicPublish { return StreamConfig{}, NewJSMirrorWithAtomicPublishError() } + if cfg.AllowBatchPublish { + return StreamConfig{}, NewJSMirrorWithBatchPublishError() + } if cfg.AllowMsgSchedules { return StreamConfig{}, NewJSMirrorWithMsgSchedulesError() } @@ -2477,10 +2504,12 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool, } } - // If atomic publish is disabled, delete any in-progress batches. - if !cfg.AllowAtomicPublish { + // If batch publish is disabled, delete any in-progress batches. + if !cfg.AllowAtomicPublish || !cfg.AllowBatchPublish { mset.deleteInflightBatches(false) - mset.deleteBatchApplyState() + if !cfg.AllowAtomicPublish { + mset.deleteBatchApplyState() + } } // Now update config and store's version of our config. @@ -3335,6 +3364,10 @@ func (mset *stream) setupMirrorConsumer() error { hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-") // Remove any Nats-Batch- headers, batching is not supported when mirroring. hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Batch-") + // Remove any Nats-Fast-Batch- headers, batching is not supported when mirroring. + hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Fast-Batch-") + // Remove any Nats-Flow headers, batching is not supported when mirroring. + hdr = removeHeaderIfPresent(hdr, "Nats-Flow") } mset.queueInbound(msgs, subject, reply, hdr, msg, nil, nil) mirror.last.Store(time.Now().UnixNano()) @@ -3917,6 +3950,10 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-") // Remove any Nats-Batch- headers, batching is not supported when sourcing. hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Batch-") + // Remove any Nats-Fast-Batch- headers, batching is not supported when sourcing. + hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Fast-Batch-") + // Remove any Nats-Flow headers, batching is not supported when sourcing. + hdr = removeHeaderIfPresent(hdr, "Nats-Flow") } // Hold onto the origin reply which has all the metadata. hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.subj, m.rply)) @@ -4848,11 +4885,18 @@ func getMessageScheduler(hdr []byte) string { } // Fast lookup of batch ID. -func getBatchId(hdr []byte) string { +// Returns whether the batch should be handled as atomic (true) or fast (false). +func getBatchId(hdr []byte) (string, bool) { if len(hdr) == 0 { - return _EMPTY_ + return _EMPTY_, false + } + if atomicBatchId := sliceHeader(JSBatchId, hdr); atomicBatchId != nil { + return string(atomicBatchId), true + } + if fastBatchId := sliceHeader(JSFastBatchId, hdr); fastBatchId != nil { + return string(fastBatchId), false } - return string(getHeader(JSBatchId, hdr)) + return _EMPTY_, false } // Fast lookup of batch sequence. @@ -5387,11 +5431,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, var resp = &JSPubAckResponse{} - var batchId string - var batchSeq uint64 + var ( + batchId string + batchSeq uint64 + batchAtomic bool + ) if len(hdr) > 0 { // Populate batch details. - if batchId = getBatchId(hdr); batchId != _EMPTY_ { + if batchId, batchAtomic = getBatchId(hdr); batchId != _EMPTY_ { batchSeq, _ = getBatchSequence(hdr) // Disable consistency checking if this was already done // earlier as part of the batch consistency check. @@ -5941,6 +5988,22 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if msgId != _EMPTY_ { mset.storeMsgId(&ddentry{msgId, mset.lseq, ts}) } + // If using fast batch publish, we occasionally send flow control messages. + // And, we need to ensure a PubAck is sent if the commit happens through EOB. + if batchId != _EMPTY_ && !batchAtomic && mset.batches != nil { + batches := mset.batches + batches.mu.Lock() + b, commitReply := batches.fastBatchRegisterSequences(batchId, batchSeq, mset.lseq) + if commitReply != _EMPTY_ { + reply = commitReply + canRespond = doAck && len(reply) > 0 && isLeader + } else if canRespond && b != nil { + // If not committing, we need to send a flow control message instead. + canRespond = false + b.fastBatchFlowControl(batchSeq, mset, reply) + } + batches.mu.Unlock() + } if canRespond { response = append(pubAck, strconv.FormatUint(mset.lseq, 10)...) if batchId != _EMPTY_ { @@ -6127,6 +6190,23 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, outq.send(newJSPubMsg(tsubj, _EMPTY_, _EMPTY_, hdr, rpMsg, nil, seq)) } + // If using fast batch publish, we occasionally send flow control messages. + // And, we need to ensure a PubAck is sent if the commit happens through EOB. + if batchId != _EMPTY_ && !batchAtomic && mset.batches != nil { + batches := mset.batches + batches.mu.Lock() + b, commitReply := batches.fastBatchRegisterSequences(batchId, batchSeq, mset.lseq) + if commitReply != _EMPTY_ { + reply = commitReply + canRespond = doAck && len(reply) > 0 && isLeader + } else if canRespond && b != nil { + // If not committing, we need to send a flow control message instead. + canRespond = false + b.fastBatchFlowControl(batchSeq, mset, reply) + } + batches.mu.Unlock() + } + // Send response here. if canRespond { response = append(pubAck, strconv.FormatUint(seq, 10)...) @@ -6154,19 +6234,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, return nil } -// processJetStreamBatchMsg processes a JetStream message that's part of an atomic batch publish. +// processJetStreamBatchMsg processes a JetStream message that's part of an atomic or fast batch publish. // Handles constraints around the batch, storing messages, doing consistency checks, and performing the commit. -func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr, msg []byte, mt *msgTrace) (retErr error) { - // For possible error response. - var response []byte - +func (mset *stream) processJetStreamBatchMsg(batchId string, atomic bool, subject, reply string, hdr, msg []byte, mt *msgTrace) (retErr error) { mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 name, stype := mset.cfg.Name, mset.cfg.Storage discard, discardNewPer, maxMsgs, maxMsgsPer, maxBytes := mset.cfg.Discard, mset.cfg.DiscardNewPer, mset.cfg.MaxMsgs, mset.cfg.MaxMsgsPer, mset.cfg.MaxBytes s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq - isLeader, isClustered, isSealed, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, allowAtomicPublish := mset.isLeader(), mset.isClustered(), mset.cfg.Sealed, mset.cfg.AllowRollup, mset.cfg.DenyPurge, mset.cfg.AllowMsgTTL, mset.cfg.AllowMsgCounter, mset.cfg.AllowMsgSchedules, mset.cfg.AllowAtomicPublish + isLeader, isClustered, isSealed, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, allowAtomicPublish, allowBatchPublish := mset.isLeader(), mset.isClustered(), mset.cfg.Sealed, mset.cfg.AllowRollup, mset.cfg.DenyPurge, mset.cfg.AllowMsgTTL, mset.cfg.AllowMsgCounter, mset.cfg.AllowMsgSchedules, mset.cfg.AllowAtomicPublish, mset.cfg.AllowBatchPublish mset.mu.RUnlock() // If message tracing (with message delivery), we will need to send the @@ -6186,26 +6263,27 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr return NewJSClusterNotLeaderError() } + respondError := func(apiErr *ApiError) error { + if canRespond { + buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: apiErr}) + outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0)) + } + return apiErr + } + // Bail here if sealed. if isSealed { - var resp = JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}, Error: NewJSStreamSealedError()} - b, _ := json.Marshal(resp) - mset.outq.sendMsg(reply, b) - return NewJSStreamSealedError() + return respondError(NewJSStreamSealedError()) } // Check here pre-emptively if we have exceeded this server limits. if js.limitsExceeded(stype) { s.resourcesExceededError(stype) - if canRespond { - b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: NewJSInsufficientResourcesError()}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) - } // Stepdown regardless. if node := mset.raftNode(); node != nil { node.StepDown() } - return NewJSInsufficientResourcesError() + return respondError(NewJSInsufficientResourcesError()) } // Check here pre-emptively if we have exceeded our account limits. @@ -6214,13 +6292,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr err = NewJSAccountResourcesExceededError() } s.RateLimitWarnf("JetStream account limits exceeded for '%s': %s", jsa.acc().GetName(), err.Error()) - if canRespond { - var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} - resp.Error = err - response, _ = json.Marshal(resp) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) - } - return err + return respondError(err) } // Check msgSize if we have a limit set there. Again this works if it goes through but better to be pre-emptive. @@ -6228,42 +6300,32 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr if maxMsgSize >= 0 && (len(hdr) > maxMsgSize || len(msg) > maxMsgSize-len(hdr)) { err := fmt.Errorf("JetStream message size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name) s.RateLimitWarnf("%s", err.Error()) - if canRespond { - var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} - resp.Error = NewJSStreamMessageExceedsMaximumError() - response, _ = json.Marshal(resp) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) - } + _ = respondError(NewJSStreamMessageExceedsMaximumError()) return err } - if !allowAtomicPublish { - err := NewJSAtomicPublishDisabledError() - if canRespond { - b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) - } - return err + if atomic && !allowAtomicPublish { + return respondError(NewJSAtomicPublishDisabledError()) + } else if !atomic && !allowBatchPublish { + return respondError(NewJSBatchPublishDisabledError()) } // Batch ID is too long. if len(batchId) > 64 { - err := NewJSAtomicPublishInvalidBatchIDError() - if canRespond { - b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) + err := NewJSBatchPublishInvalidBatchIDError() + if atomic { + err = NewJSAtomicPublishInvalidBatchIDError() } - return err + return respondError(err) } batchSeq, exists := getBatchSequence(hdr) if !exists { - err := NewJSAtomicPublishMissingSeqError() - if canRespond { - b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) + err := NewJSBatchPublishMissingSeqError() + if atomic { + err = NewJSAtomicPublishMissingSeqError() } - return err + return respondError(err) } mset.mu.Lock() @@ -6277,11 +6339,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr respondIncompleteBatch := func() error { err := NewJSAtomicPublishIncompleteBatchError() - if canRespond { - buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0)) - } - return err + return respondError(err) } // Get batch. @@ -6290,6 +6348,10 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr if !ok { if batchSeq != 1 { batches.mu.Unlock() + if !atomic { + err := NewJSBatchPublishUnknownBatchIDError() + return respondError(err) + } maxBatchSize := streamMaxBatchSize opts := s.getOpts() if opts.JetStreamLimits.MaxBatchSize > 0 { @@ -6297,11 +6359,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } if batchSeq > uint64(maxBatchSize) { err := NewJSAtomicPublishTooLargeBatchError(maxBatchSize) - if canRespond { - buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0)) - } - return err + return respondError(err) } return respondIncompleteBatch() } @@ -6331,10 +6389,28 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } var err error - if b, err = batches.newBatchGroup(mset, batchId); err != nil { - globalInflightBatches.Add(-1) - batches.mu.Unlock() - return respondIncompleteBatch() + if atomic { + b, err = batches.newAtomicBatchGroup(mset, batchId) + if err != nil { + globalInflightBatches.Add(-1) + batches.mu.Unlock() + return respondIncompleteBatch() + } + } else { + var gapOk bool + if gapMode := bytesToString(sliceHeader(JSBatchGap, hdr)); gapMode == JSFastBatchGapOk { + gapOk = true + } else if gapMode != _EMPTY_ && gapMode != JSFastBatchGapFail { + globalInflightBatches.Add(-1) + batches.mu.Unlock() + return respondError(NewJSBatchPublishInvalidGapModeError()) + } + ackMessages := uint64(10) // TODO(mvv): just some default for now + if flow := sliceHeader(JSFlow, hdr); flow != nil { + // TODO(mvv): error handling? + ackMessages = uint64(parseInt64(flow)) + } + b = batches.newFastBatchGroup(mset, batchId, gapOk, ackMessages) } batches.group[batchId] = b } @@ -6347,11 +6423,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr b.cleanupLocked(batchId, batches) batches.mu.Unlock() err := NewJSAtomicPublishInvalidBatchCommitError() - if canRespond { - b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) - } - return err + return respondError(err) } commit = true } @@ -6362,22 +6434,95 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr b.cleanupLocked(batchId, batches) batches.mu.Unlock() err := NewJSRequiredApiLevelError() - if canRespond { - b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) - } - return err + return respondError(err) } hdr = removeHeaderIfPresent(hdr, JSRequiredApiLevel) } + // Fast publishing resets the cleanup timer. + // If cleanup has already happened, we can't continue. + cleanup := !atomic && !b.resetCleanupTimer(mset) + // Detect gaps. b.lseq++ - if b.lseq != batchSeq { - b.cleanupLocked(batchId, batches) + if b.lseq != batchSeq || cleanup { + reject := true + // If a gap is detected, we always report about it. + if !atomic { + buf, _ := json.Marshal(&BatchFlowAck{LastSequence: b.lseq - 1, CurrentSequence: batchSeq}) + outq.sendMsg(reply, buf) + // If the gap is okay, we can continue without rejecting. + if b.gapOk && !cleanup { + reject = false + b.lseq = batchSeq + } + } + if reject { + // Revert, since we incremented for the gap check. + b.lseq-- + if !atomic { + batches.fastBatchCommit(b, batchId, mset, reply) + batches.mu.Unlock() + return nil + } + b.cleanupLocked(batchId, batches) + batches.mu.Unlock() + mset.sendStreamBatchAbandonedAdvisory(batchId, BatchIncomplete) + return respondIncompleteBatch() + } + } + + // If not atomic we care about going fast. Proposals happen immediately. + if !atomic { + if commit { + if commitEob { + // Revert, since we incremented for the gap check. + b.lseq-- + } + // We'll try to immediately send a PubAck if we can. + // Only possible if EOB is used and the last message was already persisted + // Otherwise, this sets up the commit reply for the last message we're about to propose. + batches.fastBatchCommit(b, batchId, mset, reply) + if commitEob { + batches.mu.Unlock() + return nil + } + } batches.mu.Unlock() - mset.sendStreamBatchAbandonedAdvisory(batchId, BatchIncomplete) - return respondIncompleteBatch() + + // The first message in the batch responds with the settings used for flow control. + if batchSeq == 1 && canRespond { + buf, _ := json.Marshal(&BatchFlowAck{AckMessages: b.ackMessages}) + outq.sendMsg(reply, buf) + } + + // Proceed with proposing this message. + + // We only use mset.clseq for clustering and in case we run ahead of actual commits. + // Check if we need to set initial value here + mset.clMu.Lock() + if mset.clseq == 0 || mset.clseq < lseq+mset.clfs { + lseq = recalculateClusteredSeq(mset) + } + + var err error + diff := &batchStagedDiff{} + if hdr, msg, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { + mset.clMu.Unlock() + // FIXME(mvv): errors need to be handled for fast batch publish, return both a success PubAck and error? + return err + } + // We only reply if we're committing, or if we've reached the ack threshold. + if !commit && batchSeq%b.ackMessages != 0 { + reply = _EMPTY_ + } + if !isClustered { + mset.clMu.Unlock() + return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0, mt, false, true) + } + commitSingleMsg(diff, mset, subject, reply, hdr, msg, name, jsa, mt, node, r, lseq) + mset.clMu.Unlock() + return nil } // Confirm the batch doesn't exceed the allowed size. @@ -6390,11 +6535,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr batches.mu.Unlock() mset.sendStreamBatchAbandonedAdvisory(batchId, BatchLarge) err := NewJSAtomicPublishTooLargeBatchError(maxSize) - if canRespond { - buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0)) - } - return err + return respondError(err) } // Persist, but optimize if we're committing because we already know last. @@ -6440,25 +6581,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr // We only use mset.clseq for clustering and in case we run ahead of actual commits. // Check if we need to set initial value here if isClustered && (mset.clseq == 0 || mset.clseq < lseq+mset.clfs) { - // Need to unlock and re-acquire the locks in the proper order. - mset.clMu.Unlock() - // Locking order is stream -> batchMu -> clMu - mset.mu.RLock() - batch := mset.batchApply - var batchCount uint64 - if batch != nil { - batch.mu.Lock() - batchCount = batch.count - } - mset.clMu.Lock() - // Re-capture - lseq = mset.lseq - mset.clseq = lseq + mset.clfs + batchCount - // Keep hold of the mset.clMu, but unlock the others. - if batch != nil { - batch.mu.Unlock() - } - mset.mu.RUnlock() + lseq = recalculateClusteredSeq(mset) } rollback := func(seq uint64) { @@ -6475,10 +6598,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr rollback(seq) b.cleanupLocked(batchId, batches) batches.mu.Unlock() - if canRespond { - buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: apiErr}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0)) - } + _ = respondError(apiErr) return apiErr } @@ -6521,10 +6641,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr rollback(seq) b.cleanupLocked(batchId, batches) batches.mu.Unlock() - if canRespond { - buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: apiErr}) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0)) - } + _ = respondError(apiErr) return err } @@ -6573,7 +6690,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr bhdr = genHeader(bhdr, JSBatchCommit, "1") } } - mset.processJetStreamMsg(bsubj, _reply, bhdr, bmsg, 0, 0, mt, false, false) + _ = mset.processJetStreamMsg(bsubj, _reply, bhdr, bmsg, 0, 0, mt, false, false) } mset.mu.Unlock() } else { @@ -6900,8 +7017,8 @@ func (mset *stream) internalLoop() { ims := msgs.pop() for _, im := range ims { // If we are clustered we need to propose this message to the underlying raft group. - if batchId := getBatchId(im.hdr); batchId != _EMPTY_ { - mset.processJetStreamBatchMsg(batchId, im.subj, im.rply, im.hdr, im.msg, im.mt) + if batchId, atomic := getBatchId(im.hdr); batchId != _EMPTY_ { + mset.processJetStreamBatchMsg(batchId, atomic, im.subj, im.rply, im.hdr, im.msg, im.mt) } else if isClustered { mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg, im.mt, false) } else {