From a7d979c66be5afcbaccf3d49878faad23609cc87 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 13 Oct 2025 15:23:26 +0200 Subject: [PATCH] (2.14) [IMPROVED] JetStream header indexing Signed-off-by: Maurice van Veen --- server/jetstream.go | 5 +- server/jetstream_batching.go | 45 ++-- server/jetstream_batching_test.go | 12 +- server/jetstream_cluster.go | 15 +- server/jetstream_cluster_4_test.go | 2 +- server/jetstream_test.go | 34 ++- server/stream.go | 404 +++++++++++++++++++++++------ 7 files changed, 398 insertions(+), 119 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 2b02fd99f8c..f0660a737dd 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1520,7 +1520,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro if commitEob && seq == state.LastSeq { hdr = genHeader(hdr, JSBatchCommit, "1") } - mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, sm.msg, 0, 0, nil, false, true) + var hdrIdx *jsHdrIndex + hdr, hdrIdx = indexJsHdr(hdr) + mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, hdrIdx, sm.msg, 0, 0, nil, false, true) + hdrIdx.returnToPool() } store.Delete(true) SKIP: diff --git a/server/jetstream_batching.go b/server/jetstream_batching.go index fc97f1f5453..35091f0f718 100644 --- a/server/jetstream_batching.go +++ b/server/jetstream_batching.go @@ -238,7 +238,7 @@ func (batch *batchApply) rejectBatchState(mset *stream) { // mset.mu lock must NOT be held or used. // mset.clMu lock must be held. func checkMsgHeadersPreClusteredProposal( - diff *batchStagedDiff, mset *stream, subject string, hdr []byte, msg []byte, sourced bool, name string, + diff *batchStagedDiff, mset *stream, subject string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, sourced bool, name string, jsa *jsAccount, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules bool, discard DiscardPolicy, discardNewPer bool, maxMsgSize int, maxMsgs int64, maxMsgsPer int64, maxBytes int64, ) ([]byte, []byte, uint64, *ApiError, error) { @@ -252,10 +252,13 @@ func checkMsgHeadersPreClusteredProposal( err := fmt.Errorf("JetStream header size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name) return hdr, msg, 0, NewJSStreamHeaderExceedsMaximumError(), err } + } + + if hdrIdx != nil { // Counter increments. // Only supported on counter streams, and payload must be empty (if not coming from a source). var ok bool - if incr, ok = getMessageIncr(hdr); !ok { + if incr, ok = hdrIdx.getMessageIncr(); !ok { apiErr := NewJSMessageIncrInvalidError() return hdr, msg, 0, apiErr, apiErr } else if incr != nil && !sourced { @@ -269,14 +272,14 @@ func checkMsgHeadersPreClusteredProposal( } else { // Check for incompatible headers. var doErr bool - if getRollup(hdr) != _EMPTY_ || - getExpectedStream(hdr) != _EMPTY_ || - getExpectedLastMsgId(hdr) != _EMPTY_ || - getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ { + if hdrIdx.getRollup() != _EMPTY_ || + hdrIdx.getExpectedStream() != _EMPTY_ || + hdrIdx.getExpectedLastMsgId() != _EMPTY_ || + hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ { doErr = true - } else if _, ok = getExpectedLastSeq(hdr); ok { + } else if _, ok = hdrIdx.getExpectedLastSeq(); ok { doErr = true - } else if _, ok = getExpectedLastSeqPerSubject(hdr); ok { + } else if _, ok = hdrIdx.getExpectedLastSeqPerSubject(); ok { doErr = true } @@ -287,11 +290,11 @@ func checkMsgHeadersPreClusteredProposal( } } // Expected stream name can also be pre-checked. - if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name { + if sname := hdrIdx.getExpectedStream(); sname != _EMPTY_ && sname != name { return hdr, msg, 0, NewJSStreamNotMatchError(), errStreamMismatch } // TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid. - if ttl, err := getMessageTTL(hdr); !sourced && (ttl != 0 || err != nil) { + if ttl, err := hdrIdx.getMessageTTL(); !sourced && (ttl != 0 || err != nil) { if !allowTTL { return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled } else if err != nil { @@ -300,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal( } // Check for MsgIds here at the cluster level to avoid excessive CLFS accounting. // Will help during restarts. - if msgId := getMsgId(hdr); msgId != _EMPTY_ { + if msgId := hdrIdx.getMsgId(); msgId != _EMPTY_ { // Dedupe if staged. if _, ok = diff.msgIds[msgId]; ok { return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate @@ -439,9 +442,9 @@ func checkMsgHeadersPreClusteredProposal( } } - if len(hdr) > 0 { + if hdrIdx != nil { // Expected last sequence. - if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.clseq-mset.clfs { + if seq, exists := hdrIdx.getExpectedLastSeq(); exists && seq != mset.clseq-mset.clfs { mlseq := mset.clseq - mset.clfs err := fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq) return hdr, msg, 0, NewJSStreamWrongLastSequenceError(mlseq), err @@ -452,10 +455,10 @@ func checkMsgHeadersPreClusteredProposal( } // Expected last sequence per subject. - if seq, exists := getExpectedLastSeqPerSubject(hdr); exists { + if seq, exists := hdrIdx.getExpectedLastSeqPerSubject(); exists { // Allow override of the subject used for the check. seqSubj := subject - if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ { + if optSubj := hdrIdx.getExpectedLastSeqPerSubjectForSubject(); optSubj != _EMPTY_ { seqSubj = optSubj } @@ -509,13 +512,13 @@ func checkMsgHeadersPreClusteredProposal( diff.expectedPerSubject[seqSubj] = e } } - } else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ { + } else if hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ { apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError() return hdr, msg, 0, apiErr, apiErr } // Message scheduling. - if schedule, ok := getMessageSchedule(hdr); !ok { + if schedule, ok := hdrIdx.getMessageSchedule(); !ok { apiErr := NewJSMessageSchedulesPatternInvalidError() if !allowMsgSchedules { apiErr = NewJSMessageSchedulesDisabledError() @@ -525,12 +528,12 @@ func checkMsgHeadersPreClusteredProposal( if !allowMsgSchedules { apiErr := NewJSMessageSchedulesDisabledError() return hdr, msg, 0, apiErr, apiErr - } else if scheduleTtl, ok := getMessageScheduleTTL(hdr); !ok { + } else if scheduleTtl, ok := hdrIdx.getMessageScheduleTTL(); !ok { apiErr := NewJSMessageSchedulesTTLInvalidError() return hdr, msg, 0, apiErr, apiErr } else if scheduleTtl != _EMPTY_ && !allowTTL { return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled - } else if scheduleTarget := getMessageScheduleTarget(hdr); scheduleTarget == _EMPTY_ || + } else if scheduleTarget := hdrIdx.getMessageScheduleTarget(); scheduleTarget == _EMPTY_ || !IsValidPublishSubject(scheduleTarget) || SubjectsCollide(scheduleTarget, subject) { apiErr := NewJSMessageSchedulesTargetInvalidError() return hdr, msg, 0, apiErr, apiErr @@ -547,7 +550,7 @@ func checkMsgHeadersPreClusteredProposal( // Add a rollup sub header if it doesn't already exist. // Otherwise, it must exist already as a rollup on the subject. - if rollup := getRollup(hdr); rollup == _EMPTY_ { + if rollup := hdrIdx.getRollup(); rollup == _EMPTY_ { hdr = genHeader(hdr, JSMsgRollup, JSMsgRollupSubject) } else if rollup != JSMsgRollupSubject { apiErr := NewJSMessageSchedulesRollupInvalidError() @@ -557,7 +560,7 @@ func checkMsgHeadersPreClusteredProposal( } // Check for any rollups. - if rollup := getRollup(hdr); rollup != _EMPTY_ { + if rollup := hdrIdx.getRollup(); rollup != _EMPTY_ { if !allowRollup || denyPurge { err := errors.New("rollup not permitted") return hdr, msg, 0, NewJSStreamRollupFailedError(err), err diff --git a/server/jetstream_batching_test.go b/server/jetstream_batching_test.go index 682f82362d8..265da741784 100644 --- a/server/jetstream_batching_test.go +++ b/server/jetstream_batching_test.go @@ -1388,7 +1388,9 @@ func TestJetStreamAtomicBatchPublishStageAndCommit(t *testing.T) { hdr = genHeader(hdr, key, value) } } - _, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes) + _, hdrIdx := indexJsHdr(hdr) + _, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, hdrIdx, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes) + hdrIdx.returnToPool() if m.err != nil { require_Error(t, err, m.err) } else if err != nil { @@ -1582,7 +1584,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecovery(t *testing.T) { require_True(t, commitReady) // Simulate the first message of the batch is committed. - err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true) + _, hdrIdx := indexJsHdr(hdr1) + err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true) + hdrIdx.returnToPool() require_NoError(t, err) // Simulate a hard kill, upon recovery the rest of the batch should be applied. @@ -1672,7 +1676,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecoveryCommitEob(t *testing.T) require_True(t, commitReady) // Simulate the first message of the batch is committed. - err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true) + _, hdrIdx := indexJsHdr(hdr1) + err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true) + hdrIdx.returnToPool() require_NoError(t, err) // Simulate a hard kill, upon recovery the rest of the batch should be applied. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ab4a2b1e798..d0eac14566f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3665,7 +3665,10 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR mt = mset.getAndDeleteMsgTrace(lseq) } // Process the actual message here. - err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock) + var hdrIdx *jsHdrIndex + hdr, hdrIdx = indexJsHdr(hdr) + err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock) + hdrIdx.returnToPool() // If we have inflight make sure to clear after processing. // TODO(dlc) - technically check on inflight != nil could cause datarace. @@ -3732,7 +3735,9 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR if state.Msgs == 0 { mset.store.Compact(lseq + 1) // Retry - err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock) + hdr, hdrIdx = indexJsHdr(hdr) + err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock) + hdrIdx.returnToPool() } // FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected // and what we got. @@ -8776,7 +8781,7 @@ func (mset *stream) stateSnapshotLocked() []byte { const streamLagWarnThreshold = 10_000 // processClusteredInboundMsg will propose the inbound message to the underlying raft group. -func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte, mt *msgTrace, sourced bool) (retErr error) { +func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, mt *msgTrace, sourced bool) (retErr error) { // For possible error response. var response []byte @@ -8794,7 +8799,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // We also invoke this in clustering mode for message tracing when not // performing message delivery. if node == nil || mt.traceOnly() { - return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0, mt, sourced, true) + return mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, 0, 0, mt, sourced, true) } // If message tracing (with message delivery), we will need to send the @@ -8898,7 +8903,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ err error ) diff := &batchStagedDiff{} - if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { + if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, hdrIdx, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { mset.clMu.Unlock() if err == errMsgIdDuplicate && dseq > 0 { var buf [256]byte diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 516ae2459f5..533031f2317 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4379,7 +4379,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { validateStreamState(snap) // Simulate a message being stored, but not calling Applied yet. - err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil, false, true) + err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 1, time.Now().UnixNano(), nil, false, true) require_NoError(t, err) // Simulate the stream being stopped before we're able to call Applied. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 020ad0eb589..46ec9a8dba8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22273,7 +22273,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) { mset, err := s.globalAccount().lookupStream("TEST") require_NoError(t, err) for range 2 { - require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true)) + require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 0, 0, nil, false, true)) } // We'll lock the message blocks such that we can't read, but NumPending should still function. @@ -22300,7 +22300,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) { read.Wait() <-time.After(100 * time.Millisecond) wg.Done() - mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true) + mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 0, 0, nil, false, true) }() go func() { // Run some time after we've entered processJetStreamMsg above. @@ -22316,3 +22316,33 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) { return nil }) } + +func TestJetStreamHdrIndexUpdateHdr(t *testing.T) { + updateKey := "Nats-Update-Header" + for _, test := range []struct { + title string + updateHdr func(hdr []byte) + }{ + {title: "SetHeader", updateHdr: func(hdr []byte) { setHeader(updateKey, "s", hdr) }}, + {title: "GenHeader", updateHdr: func(hdr []byte) { genHeader(hdr, updateKey, "s") }}, + {title: "RemoveHeaderIfPresent", updateHdr: func(hdr []byte) { removeHeaderIfPresent(hdr, updateKey) }}, + {title: "RemoveHeaderIfPrefixPresent", updateHdr: func(hdr []byte) { removeHeaderIfPrefixPresent(hdr, updateKey) }}, + } { + t.Run(test.title, func(t *testing.T) { + hdr := genHeader(nil, "Nats-Batch-Id", "uuid") + hdr = genHeader(hdr, updateKey, "long_value") + hdr = genHeader(hdr, "Nats-Batch-Sequence", "seq") + + var idx *jsHdrIndex + hdr, idx = indexJsHdr(hdr) + defer idx.returnToPool() + require_NotNil(t, idx) + require_Equal(t, string(idx.batchId), "uuid") + require_Equal(t, string(idx.batchSeq), "seq") + + test.updateHdr(hdr) + require_Equal(t, string(idx.batchId), "uuid") + require_Equal(t, string(idx.batchSeq), "seq") + }) + } +} diff --git a/server/stream.go b/server/stream.go index e6c864a4fa3..2c07ab63f55 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2968,7 +2968,10 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { err = node.Propose(encodeStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts, true)) } } else { - err = mset.processJetStreamMsg(m.subj, _EMPTY_, m.hdr, m.msg, sseq-1, ts, nil, true, true) + var hdrIdx *jsHdrIndex + m.hdr, hdrIdx = indexJsHdr(m.hdr) + err = mset.processJetStreamMsg(m.subj, _EMPTY_, m.hdr, hdrIdx, m.msg, sseq-1, ts, nil, true, true) + hdrIdx.returnToPool() } if err != nil { if strings.Contains(err.Error(), "no space left") { @@ -3936,13 +3939,18 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { } } - var err error + var ( + err error + hdrIdx *jsHdrIndex + ) // If we are clustered we need to propose this message to the underlying raft group. + hdr, hdrIdx = indexJsHdr(hdr) if node != nil { - err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, msg, nil, true) + err = mset.processClusteredInboundMsg(m.subj, _EMPTY_, hdr, hdrIdx, msg, nil, true) } else { - err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, msg, 0, 0, nil, true, true) + err = mset.processJetStreamMsg(m.subj, _EMPTY_, hdr, hdrIdx, msg, 0, 0, nil, true, true) } + hdrIdx.returnToPool() if err != nil { s := mset.srv @@ -4576,12 +4584,15 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { } }) mset.store.RegisterProcessJetStreamMsg(func(im *inMsg) { + var hdrIdx *jsHdrIndex + im.hdr, hdrIdx = indexJsHdr(im.hdr) + defer hdrIdx.returnToPool() if mset.IsClustered() { if mset.IsLeader() { - mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg, im.mt, false) + mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, hdrIdx, im.msg, im.mt, false) } } else { - mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0, im.mt, false, true) + mset.processJetStreamMsg(im.subj, im.rply, im.hdr, hdrIdx, im.msg, 0, 0, im.mt, false, true) } }) mset.mu.Unlock() @@ -4707,54 +4718,68 @@ func (mset *stream) storeMsgIdLocked(dde *ddentry) { } } -// Fast lookup of msgId. +// Lookup of msgId without an index. func getMsgId(hdr []byte) string { return string(getHeader(JSMsgId, hdr)) } +// Fast lookup of msgId. +func (hdrIdx *jsHdrIndex) getMsgId() string { + if hdrIdx == nil || len(hdrIdx.msgId) == 0 { + return _EMPTY_ + } + return string(hdrIdx.msgId) +} + // Fast lookup of expected last msgId. -func getExpectedLastMsgId(hdr []byte) string { - return string(getHeader(JSExpectedLastMsgId, hdr)) +func (hdrIdx *jsHdrIndex) getExpectedLastMsgId() string { + if hdrIdx == nil || len(hdrIdx.expLastMsgId) == 0 { + return _EMPTY_ + } + return string(hdrIdx.expLastMsgId) } // Fast lookup of expected stream. -func getExpectedStream(hdr []byte) string { - return string(getHeader(JSExpectedStream, hdr)) +func (hdrIdx *jsHdrIndex) getExpectedStream() string { + if hdrIdx == nil || len(hdrIdx.expStream) == 0 { + return _EMPTY_ + } + return string(hdrIdx.expStream) } // Fast lookup of expected last sequence. -func getExpectedLastSeq(hdr []byte) (uint64, bool) { - bseq := sliceHeader(JSExpectedLastSeq, hdr) - if len(bseq) == 0 { +func (hdrIdx *jsHdrIndex) getExpectedLastSeq() (uint64, bool) { + if hdrIdx == nil || len(hdrIdx.expLastSeq) == 0 { return 0, false } - return uint64(parseInt64(bseq)), true + return uint64(parseInt64(hdrIdx.expLastSeq)), true } // Fast lookup of rollups. -func getRollup(hdr []byte) string { - r := getHeader(JSMsgRollup, hdr) - if len(r) == 0 { +func (hdrIdx *jsHdrIndex) getRollup() string { + if hdrIdx == nil || len(hdrIdx.rollup) == 0 { return _EMPTY_ } - return strings.ToLower(string(r)) + return strings.ToLower(string(hdrIdx.rollup)) } // Fast lookup of expected stream sequence per subject. -func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) { - bseq := sliceHeader(JSExpectedLastSubjSeq, hdr) - if len(bseq) == 0 { +func (hdrIdx *jsHdrIndex) getExpectedLastSeqPerSubject() (uint64, bool) { + if hdrIdx == nil || len(hdrIdx.expLastSubjSeq) == 0 { return 0, false } - return uint64(parseInt64(bseq)), true + return uint64(parseInt64(hdrIdx.expLastSubjSeq)), true } // Fast lookup of expected subject for the expected stream sequence per subject. -func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string { - return bytesToString(sliceHeader(JSExpectedLastSubjSeqSubj, hdr)) +func (hdrIdx *jsHdrIndex) getExpectedLastSeqPerSubjectForSubject() string { + if hdrIdx == nil || len(hdrIdx.expLastSubjSeqSubj) == 0 { + return _EMPTY_ + } + return bytesToString(hdrIdx.expLastSubjSeqSubj) } -// Fast lookup of the message TTL from headers: +// Lookup of the message TTL from headers without an index: // - Positive return value: duration in seconds. // - Zero return value: no TTL or parse error. // - Negative return value: never expires. @@ -4766,6 +4791,17 @@ func getMessageTTL(hdr []byte) (int64, error) { return parseMessageTTL(bytesToString(ttl)) } +// Fast lookup of the message TTL from headers: +// - Positive return value: duration in seconds. +// - Zero return value: no TTL or parse error. +// - Negative return value: never expires. +func (hdrIdx *jsHdrIndex) getMessageTTL() (int64, error) { + if hdrIdx == nil || len(hdrIdx.ttl) == 0 { + return 0, nil + } + return parseMessageTTL(bytesToString(hdrIdx.ttl)) +} + // - Positive return value: duration in seconds. // - Zero return value: no TTL or parse error. // - Negative return value: never expires. @@ -4793,16 +4829,15 @@ func parseMessageTTL(ttl string) (int64, error) { // Fast lookup of the message Incr from headers. // Return includes the value or nil, and success. -func getMessageIncr(hdr []byte) (*big.Int, bool) { - incr := sliceHeader(JSMessageIncr, hdr) - if len(incr) == 0 { +func (hdrIdx *jsHdrIndex) getMessageIncr() (*big.Int, bool) { + if hdrIdx == nil || len(hdrIdx.incr) == 0 { return nil, true } var v big.Int - return v.SetString(bytesToString(incr), 10) + return v.SetString(bytesToString(hdrIdx.incr), 10) } -// Fast lookup of message schedule. +// Lookup of message schedule without an index. func getMessageSchedule(hdr []byte) (time.Time, bool) { if len(hdr) == 0 { return time.Time{}, true @@ -4818,7 +4853,23 @@ func getMessageSchedule(hdr []byte) (time.Time, bool) { return t, err == nil } -// Fast lookup of the message schedule TTL from headers. +// Fast lookup of message schedule. +func (hdrIdx *jsHdrIndex) getMessageSchedule() (time.Time, bool) { + if hdrIdx == nil || len(hdrIdx.schedPattern) == 0 { + return time.Time{}, true + } + val := bytesToString(hdrIdx.schedPattern) + if val == _EMPTY_ { + return time.Time{}, true + } + if !strings.HasPrefix(val, "@at ") { + return time.Time{}, false + } + t, err := time.Parse(time.RFC3339, val[4:]) + return t, err == nil +} + +// Lookup of the message schedule TTL from headers without an index. // The TTL is confirmed to be valid, but the raw TTL string is returned. func getMessageScheduleTTL(hdr []byte) (string, bool) { ttl := getHeader(JSScheduleTTL, hdr) @@ -4831,7 +4882,19 @@ func getMessageScheduleTTL(hdr []byte) (string, bool) { return string(ttl), true } -// Fast lookup of message schedule target. +// Fast lookup of the message schedule TTL from headers. +// The TTL is confirmed to be valid, but the raw TTL string is returned. +func (hdrIdx *jsHdrIndex) getMessageScheduleTTL() (string, bool) { + if hdrIdx == nil || len(hdrIdx.schedTtl) == 0 { + return _EMPTY_, true + } + if _, err := parseMessageTTL(bytesToString(hdrIdx.schedTtl)); err != nil { + return _EMPTY_, false + } + return string(hdrIdx.schedTtl), true +} + +// Lookup of message schedule target without an index. func getMessageScheduleTarget(hdr []byte) string { if len(hdr) == 0 { return _EMPTY_ @@ -4839,15 +4902,30 @@ func getMessageScheduleTarget(hdr []byte) string { return string(getHeader(JSScheduleTarget, hdr)) } +// Fast lookup of message schedule target. +func (hdrIdx *jsHdrIndex) getMessageScheduleTarget() string { + if hdrIdx == nil || len(hdrIdx.schedTarget) == 0 { + return _EMPTY_ + } + return string(hdrIdx.schedTarget) +} + // Fast lookup of message scheduler. -func getMessageScheduler(hdr []byte) string { - if len(hdr) == 0 { +func (hdrIdx *jsHdrIndex) getMessageScheduler() string { + if hdrIdx == nil || len(hdrIdx.scheduler) == 0 { return _EMPTY_ } - return string(getHeader(JSScheduler, hdr)) + return bytesToString(hdrIdx.scheduler) } -// Fast lookup of batch ID. +func (hdrIdx *jsHdrIndex) getMessageScheduleNext() string { + if hdrIdx == nil || len(hdrIdx.schedNext) == 0 { + return _EMPTY_ + } + return bytesToString(hdrIdx.schedNext) +} + +// Lookup of batch ID without an index. func getBatchId(hdr []byte) string { if len(hdr) == 0 { return _EMPTY_ @@ -4855,7 +4933,15 @@ func getBatchId(hdr []byte) string { return string(getHeader(JSBatchId, hdr)) } -// Fast lookup of batch sequence. +// Fast lookup of batch ID. +func (hdrIdx *jsHdrIndex) getBatchId() string { + if hdrIdx == nil || len(hdrIdx.batchId) == 0 { + return _EMPTY_ + } + return string(hdrIdx.batchId) +} + +// Lookup of batch sequence without an index. func getBatchSequence(hdr []byte) (uint64, bool) { bseq := sliceHeader(JSBatchSeq, hdr) if len(bseq) == 0 { @@ -4864,6 +4950,14 @@ func getBatchSequence(hdr []byte) (uint64, bool) { return uint64(parseInt64(bseq)), true } +// Fast lookup of batch sequence. +func (hdrIdx *jsHdrIndex) getBatchSequence() (uint64, bool) { + if hdrIdx == nil || len(hdrIdx.batchSeq) == 0 { + return 0, false + } + return uint64(parseInt64(hdrIdx.batchSeq)), true +} + // Signal if we are clustered. Will acquire rlock. func (mset *stream) IsClustered() bool { mset.mu.RLock() @@ -5325,7 +5419,7 @@ var ( ) // processJetStreamMsg is where we try to actually process the stream msg. -func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, mt *msgTrace, sourced bool, needLock bool) (retErr error) { +func (mset *stream) processJetStreamMsg(subject, reply string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, lseq uint64, ts int64, mt *msgTrace, sourced bool, needLock bool) (retErr error) { if mt != nil { // Only the leader/standalone will have mt!=nil. On exit, send the // message trace event. @@ -5389,10 +5483,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, var batchId string var batchSeq uint64 - if len(hdr) > 0 { + if hdrIdx != nil { // Populate batch details. - if batchId = getBatchId(hdr); batchId != _EMPTY_ { - batchSeq, _ = getBatchSequence(hdr) + if batchId = hdrIdx.getBatchId(); batchId != _EMPTY_ { + batchSeq, _ = hdrIdx.getBatchSequence() // Disable consistency checking if this was already done // earlier as part of the batch consistency check. canConsistencyCheck = traceOnly @@ -5448,18 +5542,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } - // If we have received this message across an account we may have request information attached. - // For now remove. TODO(dlc) - Should this be opt-in or opt-out? - if len(hdr) > 0 { - hdr = removeHeaderIfPresent(hdr, ClientInfoHdr) - } - // Process additional msg headers if still present. var msgId string var incr *big.Int var rollupSub, rollupAll bool - if len(hdr) > 0 { + if hdrIdx != nil { // Certain checks have already been performed if in clustered mode, so only check if not. // Note, for cluster mode but with message tracing (without message delivery), we need // to do this check here since it was not done in processClusteredInboundMsg(). @@ -5467,7 +5555,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Counter increments. // Only supported on counter streams, and payload must be empty (if not coming from a source). var ok bool - if incr, ok = getMessageIncr(hdr); !ok { + if incr, ok = hdrIdx.getMessageIncr(); !ok { apiErr := NewJSMessageIncrInvalidError() if canRespond { resp.PubAck = &PubAck{Stream: name} @@ -5499,14 +5587,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } else { // Check for incompatible headers. var doErr bool - if getRollup(hdr) != _EMPTY_ || - getExpectedStream(hdr) != _EMPTY_ || - getExpectedLastMsgId(hdr) != _EMPTY_ || - getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ { + if hdrIdx.getRollup() != _EMPTY_ || + hdrIdx.getExpectedStream() != _EMPTY_ || + hdrIdx.getExpectedLastMsgId() != _EMPTY_ || + hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ { doErr = true - } else if _, ok := getExpectedLastSeq(hdr); ok { + } else if _, ok := hdrIdx.getExpectedLastSeq(); ok { doErr = true - } else if _, ok := getExpectedLastSeqPerSubject(hdr); ok { + } else if _, ok := hdrIdx.getExpectedLastSeqPerSubject(); ok { doErr = true } @@ -5524,7 +5612,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Expected stream. - if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name { + if sname := hdrIdx.getExpectedStream(); sname != _EMPTY_ && sname != name { if canRespond { resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSStreamNotMatchError() @@ -5537,7 +5625,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // TTL'd messages are rejected entirely if TTLs are not enabled on the stream. // Shouldn't happen in clustered mode since we should have already caught this // in processClusteredInboundMsg, but needed here for non-clustered etc. - if ttl, _ := getMessageTTL(hdr); !sourced && ttl != 0 && !mset.cfg.AllowMsgTTL { + if ttl, _ := hdrIdx.getMessageTTL(); !sourced && ttl != 0 && !mset.cfg.AllowMsgTTL { if canRespond { resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSMessageTTLDisabledError() @@ -5548,10 +5636,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Expected last sequence per subject. - if seq, exists := getExpectedLastSeqPerSubject(hdr); exists { + if seq, exists := hdrIdx.getExpectedLastSeqPerSubject(); exists { // Allow override of the subject used for the check. seqSubj := subject - if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ { + if optSubj := hdrIdx.getExpectedLastSeqPerSubjectForSubject(); optSubj != _EMPTY_ { seqSubj = optSubj } @@ -5574,7 +5662,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq) } - } else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ { + } else if hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ { apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError() if canRespond { resp.PubAck = &PubAck{Stream: name} @@ -5586,7 +5674,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Expected last sequence. - if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.lseq { + if seq, exists := hdrIdx.getExpectedLastSeq(); exists && seq != mset.lseq { mlseq := mset.lseq if canRespond { resp.PubAck = &PubAck{Stream: name} @@ -5598,7 +5686,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Message scheduling. - if schedule, ok := getMessageSchedule(hdr); !ok { + if schedule, ok := hdrIdx.getMessageSchedule(); !ok { apiErr := NewJSMessageSchedulesPatternInvalidError() if !allowMsgSchedules { apiErr = NewJSMessageSchedulesDisabledError() @@ -5620,7 +5708,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, outq.sendMsg(reply, b) } return apiErr - } else if scheduleTtl, ok := getMessageScheduleTTL(hdr); !ok { + } else if scheduleTtl, ok := hdrIdx.getMessageScheduleTTL(); !ok { apiErr := NewJSMessageSchedulesTTLInvalidError() if canRespond { resp.PubAck = &PubAck{Stream: name} @@ -5637,7 +5725,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, outq.sendMsg(reply, b) } return errMsgTTLDisabled - } else if scheduleTarget := getMessageScheduleTarget(hdr); scheduleTarget == _EMPTY_ || + } else if scheduleTarget := hdrIdx.getMessageScheduleTarget(); scheduleTarget == _EMPTY_ || !IsValidPublishSubject(scheduleTarget) || SubjectsCollide(scheduleTarget, subject) { apiErr := NewJSMessageSchedulesTargetInvalidError() if canRespond { @@ -5664,8 +5752,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Add a rollup sub header if it doesn't already exist. // Otherwise, it must exist already as a rollup on the subject. - if rollup := getRollup(hdr); rollup == _EMPTY_ { + if rollup := hdrIdx.getRollup(); rollup == _EMPTY_ { hdr = genHeader(hdr, JSMsgRollup, JSMsgRollupSubject) + // We don't re-index the headers, so need to set this manually. + rollupSub = true } else if rollup != JSMsgRollupSubject { apiErr := NewJSMessageSchedulesRollupInvalidError() if canRespond { @@ -5682,7 +5772,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Dedupe detection. This is done at the cluster level for dedupe detection above the // lower layers. But we still need to pull out the msgId. - if msgId = getMsgId(hdr); msgId != _EMPTY_ { + if msgId = hdrIdx.getMsgId(); msgId != _EMPTY_ { // Do real check only if not clustered or traceOnly flag is set. if canConsistencyCheck { var seq uint64 @@ -5704,7 +5794,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Expected last msgId. - if lmsgId := getExpectedLastMsgId(hdr); lmsgId != _EMPTY_ { + if lmsgId := hdrIdx.getExpectedLastMsgId(); lmsgId != _EMPTY_ { if lmsgId != mset.lmsgId { last := mset.lmsgId bumpCLFS() @@ -5718,7 +5808,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } // Check for any rollups. - if rollup := getRollup(hdr); rollup != _EMPTY_ { + if rollup := hdrIdx.getRollup(); rollup != _EMPTY_ { if canConsistencyCheck && (!mset.cfg.AllowRollup || mset.cfg.DenyPurge) { err := errors.New("rollup not permitted") if canRespond { @@ -5999,7 +6089,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // Find the message TTL if any. - ttl, err := getMessageTTL(hdr) + ttl, err := hdrIdx.getMessageTTL() if err != nil { if canRespond { resp.PubAck = &PubAck{Stream: name} @@ -6016,8 +6106,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if ttl > 0 && mset.cfg.SubjectDeleteMarkerTTL > 0 && mset.cfg.MaxMsgsPer != 1 { if minTtl := int64(mset.cfg.SubjectDeleteMarkerTTL.Seconds()); ttl < minTtl { ttl = minTtl - hdr = removeHeaderIfPresent(hdr, JSMessageTTL) - hdr = genHeader(hdr, JSMessageTTL, strconv.FormatInt(ttl, 10)) + hdr = setHeader(JSMessageTTL, strconv.FormatInt(ttl, 10), hdr) } } @@ -6090,9 +6179,9 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, mset.purgeLocked(&JSApiStreamPurgeRequest{Subject: subject, Keep: 1}, false) } else if rollupAll { mset.purgeLocked(&JSApiStreamPurgeRequest{Keep: 1}, false) - } else if scheduleNext := sliceHeader(JSScheduleNext, hdr); len(scheduleNext) > 0 && bytesToString(scheduleNext) == JSScheduleNextPurge { + } else if scheduleNext := hdrIdx.getMessageScheduleNext(); scheduleNext == JSScheduleNextPurge { // Purge the message schedule. - scheduler := getMessageScheduler(hdr) + scheduler := hdrIdx.getMessageScheduler() if scheduler != _EMPTY_ { mset.purgeLocked(&JSApiStreamPurgeRequest{Subject: scheduler}, false) } @@ -6156,7 +6245,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // processJetStreamBatchMsg processes a JetStream message that's part of an atomic batch publish. // Handles constraints around the batch, storing messages, doing consistency checks, and performing the commit. -func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr, msg []byte, mt *msgTrace) (retErr error) { +func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, mt *msgTrace) (retErr error) { // For possible error response. var response []byte @@ -6256,7 +6345,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr return err } - batchSeq, exists := getBatchSequence(hdr) + batchSeq, exists := hdrIdx.getBatchSequence() if !exists { err := NewJSAtomicPublishMissingSeqError() if canRespond { @@ -6340,7 +6429,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } var commit, commitEob bool - if c := sliceHeader(JSBatchCommit, hdr); c != nil { + if c := hdrIdx.batchCommit; c != nil { commitEob = bytes.Equal(c, []byte("eob")) // Reject the batch if the commit is not recognized. if !commitEob && !bytes.Equal(c, []byte("1")) { @@ -6357,7 +6446,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } // The required API level can have the batch be rejected. But the header is always removed. - if len(sliceHeader(JSRequiredApiLevel, hdr)) != 0 { + if hdrIdx.reqApiLevel { if errorOnRequiredApiLevel(hdr) { b.cleanupLocked(batchId, batches) batches.mu.Unlock() @@ -6512,12 +6601,15 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr return respondIncompleteBatch() } + bhdr, hdrIdx = indexJsHdr(bhdr) + // Reject unsupported headers. - if getExpectedLastMsgId(bhdr) != _EMPTY_ { + if hdrIdx.getExpectedLastMsgId() != _EMPTY_ { + hdrIdx.returnToPool() return errorOnUnsupported(seq, JSExpectedLastMsgId) } - - if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, bsubj, bhdr, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { + if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, bsubj, bhdr, hdrIdx, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { + hdrIdx.returnToPool() rollback(seq) b.cleanupLocked(batchId, batches) batches.mu.Unlock() @@ -6527,6 +6619,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } return err } + hdrIdx.returnToPool() if isClustered { var _reply string @@ -6573,7 +6666,9 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr bhdr = genHeader(bhdr, JSBatchCommit, "1") } } - mset.processJetStreamMsg(bsubj, _reply, bhdr, bmsg, 0, 0, mt, false, false) + bhdr, hdrIdx = indexJsHdr(bhdr) + mset.processJetStreamMsg(bsubj, _reply, bhdr, hdrIdx, bmsg, 0, 0, mt, false, false) + hdrIdx.returnToPool() } mset.mu.Unlock() } else { @@ -6597,6 +6692,140 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr return nil } +type jsHdrIndex struct { + msgId []byte + expStream []byte + expLastSeq []byte + expLastSubjSeq []byte + expLastSubjSeqSubj []byte + expLastMsgId []byte + rollup []byte + ttl []byte + incr []byte + batchId []byte + batchSeq []byte + batchCommit []byte + schedPattern []byte + schedTtl []byte + schedTarget []byte + scheduler []byte + schedNext []byte + reqApiLevel bool +} + +var jsHdrIndexPool sync.Pool + +func getJsHdrIndexFromPool() *jsHdrIndex { + idx := jsHdrIndexPool.Get() + if idx != nil { + return idx.(*jsHdrIndex) + } + return new(jsHdrIndex) +} + +func (hdrIdx *jsHdrIndex) returnToPool() { + if hdrIdx == nil { + return + } + // Nil out all values. + *hdrIdx = jsHdrIndex{} + jsHdrIndexPool.Put(hdrIdx) +} + +// indexJsHdr indexes the JetStream headers in the message. +// The ClientInfoHdr (if present) is also removed, and the update header is returned. +func indexJsHdr(hdr []byte) ([]byte, *jsHdrIndex) { + hdrLen, ehdrLen := len(hdr), len(emptyHdrLine) + if hdrLen <= ehdrLen || !bytes.HasPrefix(hdr, []byte(hdrLine)) || hdr[hdrLen-2] != '\r' || hdr[hdrLen-1] != '\n' { + return hdr, nil + } + var idx *jsHdrIndex + offset := len(hdrLine) + // While contains more than just CRLF. + for offset+2 < hdrLen { + colon := bytes.IndexByte(hdr[offset:], ':') + if colon < 0 { + colon = 0 + } + end := bytes.IndexByte(hdr[offset+colon:], '\r') + if hdr[offset+colon+end+1] != '\n' { + // The line is incomplete, just skip over it. + offset += colon + end + 1 + continue + } + // If the line is missing a ':', just skip over the value. + if colon == 0 { + offset += end + LEN_CR_LF + continue + } + key := hdr[offset : offset+colon] + valueStart := offset + colon + 1 // Right after the ':'. + // Skip over whitespace before the value. + for valueStart < hdrLen && hdr[valueStart] == ' ' { + valueStart++ + } + valueEnd := offset + colon + end + + bkey := bytesToString(key) + + // If we have received this message across an account we may have request information attached. + // For now remove. TODO(dlc) - Should this be opt-in or opt-out? + if bkey == ClientInfoHdr { + hdr = append(hdr[:offset], hdr[valueEnd+LEN_CR_LF:]...) + hdrLen = len(hdr) + continue + } + // Move offset to the next line. + offset += colon + end + LEN_CR_LF + if idx == nil { + idx = getJsHdrIndexFromPool() + } + value := hdr[valueStart:valueEnd:valueEnd] + switch bkey { + case JSMsgId: + idx.msgId = value + case JSExpectedStream: + idx.expStream = value + case JSExpectedLastSeq: + idx.expLastSeq = value + case JSExpectedLastSubjSeq: + idx.expLastSubjSeq = value + case JSExpectedLastSubjSeqSubj: + idx.expLastSubjSeqSubj = value + case JSExpectedLastMsgId: + idx.expLastMsgId = value + case JSMsgRollup: + idx.rollup = value + case JSMessageTTL: + idx.ttl = value + case JSMessageIncr: + idx.incr = value + case JSBatchId: + idx.batchId = value + case JSBatchSeq: + idx.batchSeq = value + case JSBatchCommit: + idx.batchCommit = value + case JSSchedulePattern: + idx.schedPattern = value + case JSScheduleTTL: + idx.schedTtl = value + case JSScheduleTarget: + idx.schedTarget = value + case JSScheduler: + idx.scheduler = value + case JSScheduleNext: + idx.schedNext = value + case JSRequiredApiLevel: + idx.reqApiLevel = true + } + } + if hdrLen <= ehdrLen { + return nil, nil + } + return hdr, idx +} + // Used to signal inbound message to registered consumers. type cMsg struct { seq uint64 @@ -6898,15 +7127,18 @@ func (mset *stream) internalLoop() { // This can possibly change now so needs to be checked here. isClustered := mset.IsClustered() ims := msgs.pop() + var hdrIdx *jsHdrIndex for _, im := range ims { // If we are clustered we need to propose this message to the underlying raft group. - if batchId := getBatchId(im.hdr); batchId != _EMPTY_ { - mset.processJetStreamBatchMsg(batchId, im.subj, im.rply, im.hdr, im.msg, im.mt) + im.hdr, hdrIdx = indexJsHdr(im.hdr) + if batchId := hdrIdx.getBatchId(); batchId != _EMPTY_ { + mset.processJetStreamBatchMsg(batchId, im.subj, im.rply, im.hdr, hdrIdx, im.msg, im.mt) } else if isClustered { - mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg, im.mt, false) + mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, hdrIdx, im.msg, im.mt, false) } else { - mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0, im.mt, false, true) + mset.processJetStreamMsg(im.subj, im.rply, im.hdr, hdrIdx, im.msg, 0, 0, im.mt, false, true) } + hdrIdx.returnToPool() im.returnToPool() } msgs.recycle(&ims)