Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
if commitEob && seq == state.LastSeq {
hdr = genHeader(hdr, JSBatchCommit, "1")
}
mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, sm.msg, 0, 0, nil, false, true)
var hdrIdx *jsHdrIndex
hdr, hdrIdx = indexJsHdr(hdr)
mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, hdrIdx, sm.msg, 0, 0, nil, false, true)
hdrIdx.returnToPool()
}
store.Delete(true)
SKIP:
Expand Down
45 changes: 24 additions & 21 deletions server/jetstream_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (batch *batchApply) rejectBatchState(mset *stream) {
// mset.mu lock must NOT be held or used.
// mset.clMu lock must be held.
func checkMsgHeadersPreClusteredProposal(
diff *batchStagedDiff, mset *stream, subject string, hdr []byte, msg []byte, sourced bool, name string,
diff *batchStagedDiff, mset *stream, subject string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, sourced bool, name string,
jsa *jsAccount, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules bool,
discard DiscardPolicy, discardNewPer bool, maxMsgSize int, maxMsgs int64, maxMsgsPer int64, maxBytes int64,
) ([]byte, []byte, uint64, *ApiError, error) {
Expand All @@ -252,10 +252,13 @@ func checkMsgHeadersPreClusteredProposal(
err := fmt.Errorf("JetStream header size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name)
return hdr, msg, 0, NewJSStreamHeaderExceedsMaximumError(), err
}
}

if hdrIdx != nil {
// Counter increments.
// Only supported on counter streams, and payload must be empty (if not coming from a source).
var ok bool
if incr, ok = getMessageIncr(hdr); !ok {
if incr, ok = hdrIdx.getMessageIncr(); !ok {
apiErr := NewJSMessageIncrInvalidError()
return hdr, msg, 0, apiErr, apiErr
} else if incr != nil && !sourced {
Expand All @@ -269,14 +272,14 @@ func checkMsgHeadersPreClusteredProposal(
} else {
// Check for incompatible headers.
var doErr bool
if getRollup(hdr) != _EMPTY_ ||
getExpectedStream(hdr) != _EMPTY_ ||
getExpectedLastMsgId(hdr) != _EMPTY_ ||
getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
if hdrIdx.getRollup() != _EMPTY_ ||
hdrIdx.getExpectedStream() != _EMPTY_ ||
hdrIdx.getExpectedLastMsgId() != _EMPTY_ ||
hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ {
doErr = true
} else if _, ok = getExpectedLastSeq(hdr); ok {
} else if _, ok = hdrIdx.getExpectedLastSeq(); ok {
doErr = true
} else if _, ok = getExpectedLastSeqPerSubject(hdr); ok {
} else if _, ok = hdrIdx.getExpectedLastSeqPerSubject(); ok {
doErr = true
}

Expand All @@ -287,11 +290,11 @@ func checkMsgHeadersPreClusteredProposal(
}
}
// Expected stream name can also be pre-checked.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
if sname := hdrIdx.getExpectedStream(); sname != _EMPTY_ && sname != name {
return hdr, msg, 0, NewJSStreamNotMatchError(), errStreamMismatch
}
// TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
if ttl, err := getMessageTTL(hdr); !sourced && (ttl != 0 || err != nil) {
if ttl, err := hdrIdx.getMessageTTL(); !sourced && (ttl != 0 || err != nil) {
if !allowTTL {
return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
} else if err != nil {
Expand All @@ -300,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal(
}
// Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
// Will help during restarts.
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
if msgId := hdrIdx.getMsgId(); msgId != _EMPTY_ {
// Dedupe if staged.
if _, ok = diff.msgIds[msgId]; ok {
return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
Expand Down Expand Up @@ -439,9 +442,9 @@ func checkMsgHeadersPreClusteredProposal(
}
}

if len(hdr) > 0 {
if hdrIdx != nil {
// Expected last sequence.
if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.clseq-mset.clfs {
if seq, exists := hdrIdx.getExpectedLastSeq(); exists && seq != mset.clseq-mset.clfs {
mlseq := mset.clseq - mset.clfs
err := fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
return hdr, msg, 0, NewJSStreamWrongLastSequenceError(mlseq), err
Expand All @@ -452,10 +455,10 @@ func checkMsgHeadersPreClusteredProposal(
}

// Expected last sequence per subject.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
if seq, exists := hdrIdx.getExpectedLastSeqPerSubject(); exists {
// Allow override of the subject used for the check.
seqSubj := subject
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
if optSubj := hdrIdx.getExpectedLastSeqPerSubjectForSubject(); optSubj != _EMPTY_ {
seqSubj = optSubj
}

Expand Down Expand Up @@ -509,13 +512,13 @@ func checkMsgHeadersPreClusteredProposal(
diff.expectedPerSubject[seqSubj] = e
}
}
} else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
} else if hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ {
apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError()
return hdr, msg, 0, apiErr, apiErr
}

// Message scheduling.
if schedule, ok := getMessageSchedule(hdr); !ok {
if schedule, ok := hdrIdx.getMessageSchedule(); !ok {
apiErr := NewJSMessageSchedulesPatternInvalidError()
if !allowMsgSchedules {
apiErr = NewJSMessageSchedulesDisabledError()
Expand All @@ -525,12 +528,12 @@ func checkMsgHeadersPreClusteredProposal(
if !allowMsgSchedules {
apiErr := NewJSMessageSchedulesDisabledError()
return hdr, msg, 0, apiErr, apiErr
} else if scheduleTtl, ok := getMessageScheduleTTL(hdr); !ok {
} else if scheduleTtl, ok := hdrIdx.getMessageScheduleTTL(); !ok {
apiErr := NewJSMessageSchedulesTTLInvalidError()
return hdr, msg, 0, apiErr, apiErr
} else if scheduleTtl != _EMPTY_ && !allowTTL {
return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
} else if scheduleTarget := getMessageScheduleTarget(hdr); scheduleTarget == _EMPTY_ ||
} else if scheduleTarget := hdrIdx.getMessageScheduleTarget(); scheduleTarget == _EMPTY_ ||
!IsValidPublishSubject(scheduleTarget) || SubjectsCollide(scheduleTarget, subject) {
apiErr := NewJSMessageSchedulesTargetInvalidError()
return hdr, msg, 0, apiErr, apiErr
Expand All @@ -547,7 +550,7 @@ func checkMsgHeadersPreClusteredProposal(

// Add a rollup sub header if it doesn't already exist.
// Otherwise, it must exist already as a rollup on the subject.
if rollup := getRollup(hdr); rollup == _EMPTY_ {
if rollup := hdrIdx.getRollup(); rollup == _EMPTY_ {
hdr = genHeader(hdr, JSMsgRollup, JSMsgRollupSubject)
} else if rollup != JSMsgRollupSubject {
apiErr := NewJSMessageSchedulesRollupInvalidError()
Expand All @@ -557,7 +560,7 @@ func checkMsgHeadersPreClusteredProposal(
}

// Check for any rollups.
if rollup := getRollup(hdr); rollup != _EMPTY_ {
if rollup := hdrIdx.getRollup(); rollup != _EMPTY_ {
if !allowRollup || denyPurge {
err := errors.New("rollup not permitted")
return hdr, msg, 0, NewJSStreamRollupFailedError(err), err
Expand Down
12 changes: 9 additions & 3 deletions server/jetstream_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,9 @@ func TestJetStreamAtomicBatchPublishStageAndCommit(t *testing.T) {
hdr = genHeader(hdr, key, value)
}
}
_, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes)
_, hdrIdx := indexJsHdr(hdr)
_, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, hdrIdx, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes)
hdrIdx.returnToPool()
if m.err != nil {
require_Error(t, err, m.err)
} else if err != nil {
Expand Down Expand Up @@ -1582,7 +1584,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecovery(t *testing.T) {
require_True(t, commitReady)

// Simulate the first message of the batch is committed.
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true)
_, hdrIdx := indexJsHdr(hdr1)
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true)
hdrIdx.returnToPool()
require_NoError(t, err)

// Simulate a hard kill, upon recovery the rest of the batch should be applied.
Expand Down Expand Up @@ -1672,7 +1676,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecoveryCommitEob(t *testing.T)
require_True(t, commitReady)

// Simulate the first message of the batch is committed.
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true)
_, hdrIdx := indexJsHdr(hdr1)
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true)
hdrIdx.returnToPool()
require_NoError(t, err)

// Simulate a hard kill, upon recovery the rest of the batch should be applied.
Expand Down
15 changes: 10 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3665,7 +3665,10 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
mt = mset.getAndDeleteMsgTrace(lseq)
}
// Process the actual message here.
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock)
var hdrIdx *jsHdrIndex
hdr, hdrIdx = indexJsHdr(hdr)
err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock)
hdrIdx.returnToPool()

// If we have inflight make sure to clear after processing.
// TODO(dlc) - technically check on inflight != nil could cause datarace.
Expand Down Expand Up @@ -3732,7 +3735,9 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
if state.Msgs == 0 {
mset.store.Compact(lseq + 1)
// Retry
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock)
hdr, hdrIdx = indexJsHdr(hdr)
err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock)
hdrIdx.returnToPool()
}
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
// and what we got.
Expand Down Expand Up @@ -8776,7 +8781,7 @@ func (mset *stream) stateSnapshotLocked() []byte {
const streamLagWarnThreshold = 10_000

// processClusteredInboundMsg will propose the inbound message to the underlying raft group.
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte, mt *msgTrace, sourced bool) (retErr error) {
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, mt *msgTrace, sourced bool) (retErr error) {
// For possible error response.
var response []byte

Expand All @@ -8794,7 +8799,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// We also invoke this in clustering mode for message tracing when not
// performing message delivery.
if node == nil || mt.traceOnly() {
return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0, mt, sourced, true)
return mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, 0, 0, mt, sourced, true)
}

// If message tracing (with message delivery), we will need to send the
Expand Down Expand Up @@ -8898,7 +8903,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
err error
)
diff := &batchStagedDiff{}
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, hdrIdx, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
mset.clMu.Unlock()
if err == errMsgIdDuplicate && dseq > 0 {
var buf [256]byte
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4379,7 +4379,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
validateStreamState(snap)

// Simulate a message being stored, but not calling Applied yet.
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil, false, true)
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 1, time.Now().UnixNano(), nil, false, true)
require_NoError(t, err)

// Simulate the stream being stopped before we're able to call Applied.
Expand Down
34 changes: 32 additions & 2 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22273,7 +22273,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
mset, err := s.globalAccount().lookupStream("TEST")
require_NoError(t, err)
for range 2 {
require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true))
require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 0, 0, nil, false, true))
}

// We'll lock the message blocks such that we can't read, but NumPending should still function.
Expand All @@ -22300,7 +22300,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
read.Wait()
<-time.After(100 * time.Millisecond)
wg.Done()
mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true)
mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 0, 0, nil, false, true)
}()
go func() {
// Run some time after we've entered processJetStreamMsg above.
Expand All @@ -22316,3 +22316,33 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
return nil
})
}

func TestJetStreamHdrIndexUpdateHdr(t *testing.T) {
updateKey := "Nats-Update-Header"
for _, test := range []struct {
title string
updateHdr func(hdr []byte)
}{
{title: "SetHeader", updateHdr: func(hdr []byte) { setHeader(updateKey, "s", hdr) }},
{title: "GenHeader", updateHdr: func(hdr []byte) { genHeader(hdr, updateKey, "s") }},
{title: "RemoveHeaderIfPresent", updateHdr: func(hdr []byte) { removeHeaderIfPresent(hdr, updateKey) }},
{title: "RemoveHeaderIfPrefixPresent", updateHdr: func(hdr []byte) { removeHeaderIfPrefixPresent(hdr, updateKey) }},
} {
t.Run(test.title, func(t *testing.T) {
hdr := genHeader(nil, "Nats-Batch-Id", "uuid")
hdr = genHeader(hdr, updateKey, "long_value")
hdr = genHeader(hdr, "Nats-Batch-Sequence", "seq")

var idx *jsHdrIndex
hdr, idx = indexJsHdr(hdr)
defer idx.returnToPool()
require_NotNil(t, idx)
require_Equal(t, string(idx.batchId), "uuid")
require_Equal(t, string(idx.batchSeq), "seq")

test.updateHdr(hdr)
require_Equal(t, string(idx.batchId), "uuid")
require_Equal(t, string(idx.batchSeq), "seq")
})
}
}
Loading
Loading