Skip to content
60 changes: 60 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1998,5 +1998,65 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSBatchPublishDisabledErr",
"code": 400,
"error_code": 10202,
"description": "batch publish is disabled",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSBatchPublishMissingSeqErr",
"code": 400,
"error_code": 10203,
"description": "batch publish sequence is missing",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSBatchPublishInvalidBatchIDErr",
"code": 400,
"error_code": 10204,
"description": "batch publish ID is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSBatchPublishUnknownBatchIDErr",
"code": 400,
"error_code": 10205,
"description": "batch publish ID unknown",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSBatchPublishInvalidGapModeErr",
"code": 400,
"error_code": 10206,
"description": "batch publish gap mode is invalid",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorWithBatchPublishErr",
"code": 400,
"error_code": 10207,
"description": "stream mirrors can not also use batch publishing",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
5 changes: 3 additions & 2 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
smv StoreMsg
batchId string
batchSeq uint64
atomic bool
commit bool
commitEob bool
batchStoreDir string
Expand All @@ -1469,10 +1470,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
if err != nil || sm == nil {
goto SKIP
}
batchId = getBatchId(sm.hdr)
batchId, atomic = getBatchId(sm.hdr)
batchSeq, ok = getBatchSequence(sm.hdr)
commit = len(sliceHeader(JSBatchCommit, sm.hdr)) != 0
if batchId == _EMPTY_ || !ok || commit {
if batchId == _EMPTY_ || !atomic || !ok || commit {
goto SKIP
}
// We've observed a partial batch write. Write the remainder of the batch.
Expand Down
177 changes: 162 additions & 15 deletions server/jetstream_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/big"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -39,30 +40,37 @@ type batching struct {

type batchGroup struct {
lseq uint64
store StreamStore
timer *time.Timer
// Used for atomic batch publish.
store StreamStore // Where the batch is staged before committing.
// Used for fast batch publish.
pseq uint64 // Last persisted batch sequence.
sseq uint64 // Last persisted stream sequence.
ackMessages uint64 // Ack will be sent every N messages.
gapOk bool // Whether a gap is okay, if not the batch would be rejected.
reply string // If the batch is committed, this is the reply subject used for the PubAck.
}

// newAtomicBatchGroup creates an atomic batch publish group.
// Lock should be held.
func (batches *batching) newBatchGroup(mset *stream, batchId string) (*batchGroup, error) {
func (batches *batching) newAtomicBatchGroup(mset *stream, batchId string) (*batchGroup, error) {
store, err := newBatchStore(mset, batchId)
if err != nil {
return nil, err
}
b := &batchGroup{store: store}

// Create a timer to clean up after timeout.
timeout := streamMaxBatchTimeout
if maxBatchTimeout := mset.srv.getOpts().JetStreamLimits.MaxBatchTimeout; maxBatchTimeout > 0 {
timeout = maxBatchTimeout
}
b.timer = time.AfterFunc(timeout, func() {
b.cleanup(batchId, batches)
mset.sendStreamBatchAbandonedAdvisory(batchId, BatchTimeout)
})
b.setupCleanupTimer(mset, batchId, batches)
return b, nil
}

// newFastBatchGroup creates a fast batch publish group.
// Lock should be held.
func (batches *batching) newFastBatchGroup(mset *stream, batchId string, gapOk bool, ackMessages uint64) *batchGroup {
b := &batchGroup{gapOk: gapOk, ackMessages: ackMessages}
b.setupCleanupTimer(mset, batchId, batches)
return b
}

func getBatchStoreDir(mset *stream, batchId string) (string, string) {
mset.mu.RLock()
jsa, name := mset.jsa, mset.cfg.Name
Expand Down Expand Up @@ -105,10 +113,87 @@ func (b *batchGroup) readyForCommit() bool {
if !b.timer.Stop() {
return false
}
b.store.FlushAllPending()
if b.store != nil {
b.store.FlushAllPending()
}
return true
}

// fastBatchRegisterSequences registers the highest stored batch and stream sequence and returns
// whether a PubAck should be sent if the batch has been committed.
// Lock should be held.
func (batches *batching) fastBatchRegisterSequences(batchId string, batchSeq, streamSeq uint64) (*batchGroup, string) {
if b, ok := batches.group[batchId]; ok {
b.sseq = streamSeq
b.pseq = batchSeq
// If the PubAck needs to be sent now as a result of a commit.
// Return the reply and clean up the batch now.
if b.lseq == batchSeq && b.reply != _EMPTY_ {
b.cleanupLocked(batchId, batches)
return b, b.reply
}
return b, _EMPTY_
}
return nil, _EMPTY_
}

// fastBatchFlowControl sends a fast batch flow control message for the current highest sequence.
// Lock should be held.
func (b *batchGroup) fastBatchFlowControl(batchSeq uint64, mset *stream, reply string) {
if len(reply) == 0 {
return
}
response, _ := json.Marshal(&BatchFlowAck{AckMessages: b.ackMessages, CurrentSequence: batchSeq})
mset.outq.sendMsg(reply, response)
}

// fastBatchCommit ends the batch and commits the data up to that point. If all messages
// have already been persisted, a PubAck is sent immediately. Otherwise, it will be sent
// after the last message has been persisted.
// Lock should be held.
func (batches *batching) fastBatchCommit(b *batchGroup, batchId string, mset *stream, reply string) {
// If the whole batch has been persisted, we can respond with the PubAck now.
if b.lseq == b.pseq {
b.cleanupLocked(batchId, batches)
var buf [256]byte
pubAck := append(buf[:0], mset.pubAck...)
response := append(pubAck, strconv.FormatUint(b.sseq, 10)...)
response = append(response, fmt.Sprintf(",\"batch\":%q,\"count\":%d}", batchId, b.lseq)...)
if len(reply) > 0 {
mset.outq.sendMsg(reply, response)
}
return
}
// Otherwise, we need to wait and the PubAck will be sent when the last message is persisted.
// The batch will be cleaned up then, so stop the timer.
b.timer.Stop()
// And, need to store the reply for later for the PubAck.
b.reply = reply
}

// setupCleanupTimer sets up a timer to clean up the batch after a timeout.
func (b *batchGroup) setupCleanupTimer(mset *stream, batchId string, batches *batching) {
// Create a timer to clean up after timeout.
timeout := streamMaxBatchTimeout
if maxBatchTimeout := mset.srv.getOpts().JetStreamLimits.MaxBatchTimeout; maxBatchTimeout > 0 {
timeout = maxBatchTimeout
}
b.timer = time.AfterFunc(timeout, func() {
b.cleanup(batchId, batches)
mset.sendStreamBatchAbandonedAdvisory(batchId, BatchTimeout)
})
}

// resetCleanupTimer resets the cleanup timer, allowing to extend the lifetime of the batch.
// Returns whether the timer was reset without it having expired before.
func (b *batchGroup) resetCleanupTimer(mset *stream) bool {
timeout := streamMaxBatchTimeout
if maxBatchTimeout := mset.srv.getOpts().JetStreamLimits.MaxBatchTimeout; maxBatchTimeout > 0 {
timeout = maxBatchTimeout
}
return b.timer.Reset(timeout)
}

// cleanup deletes underlying resources associated with the batch and unregisters it from the stream's batches.
func (b *batchGroup) cleanup(batchId string, batches *batching) {
batches.mu.Lock()
Expand All @@ -120,15 +205,19 @@ func (b *batchGroup) cleanup(batchId string, batches *batching) {
func (b *batchGroup) cleanupLocked(batchId string, batches *batching) {
globalInflightBatches.Add(-1)
b.timer.Stop()
b.store.Delete(true)
if b.store != nil {
b.store.Delete(true)
}
delete(batches.group, batchId)
}

// Lock should be held.
func (b *batchGroup) stopLocked() {
globalInflightBatches.Add(-1)
b.timer.Stop()
b.store.Stop()
if b.store != nil {
b.store.Stop()
}
}

// batchStagedDiff stages all changes for consistency checks until commit.
Expand Down Expand Up @@ -651,3 +740,61 @@ func checkMsgHeadersPreClusteredProposal(

return hdr, msg, 0, nil, nil
}

// recalculateClusteredSeq initializes or updates mset.clseq, for example after a leader change.
// This is reused for normal clustered publishing into a stream, and for atomic and fast batch publishing.
// mset.clMu lock must be held.
func recalculateClusteredSeq(mset *stream) (lseq uint64) {
// Need to unlock and re-acquire the locks in the proper order.
mset.clMu.Unlock()
// Locking order is stream -> batchMu -> clMu
mset.mu.RLock()
batch := mset.batchApply
var batchCount uint64
if batch != nil {
batch.mu.Lock()
batchCount = batch.count
}
mset.clMu.Lock()
// Re-capture
lseq = mset.lseq
mset.clseq = lseq + mset.clfs + batchCount
// Keep hold of the mset.clMu, but unlock the others.
if batch != nil {
batch.mu.Unlock()
}
mset.mu.RUnlock()
return lseq
}

// commitSingleMsg commits and proposes a single message to the node.
// This is reused both for normal publishing into a stream, and for fast batch publishing.
// mset.clMu lock must be held.
func commitSingleMsg(
diff *batchStagedDiff, mset *stream, subject string, reply string, hdr []byte, msg []byte, name string,
jsa *jsAccount, mt *msgTrace, node RaftNode, replicas int, lseq uint64,
) {
diff.commit(mset)
esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), false)
var mtKey uint64
if mt != nil {
mtKey = mset.clseq
if mset.mt == nil {
mset.mt = make(map[uint64]*msgTrace)
}
mset.mt[mtKey] = mt
}

// Do proposal.
_ = node.Propose(esm)
// The proposal can fail, but we always account for trying.
mset.clseq++
mset.trackReplicationTraffic(node, len(esm), replicas)

// Check to see if we are being overrun.
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
if mset.clseq-(lseq+mset.clfs) > streamLagWarnThreshold {
lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name)
mset.srv.RateLimitWarnf("%s", lerr.Error())
}
}
Loading