diff --git a/server/consumer.go b/server/consumer.go index cdf46e5256..778b0e89ce 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -337,6 +337,8 @@ const ( AckAll // AckExplicit requires ack or nack for all messages. AckExplicit + // AckFlowControl functions like AckAll, but acks based on responses to flow control. + AckFlowControl ) func (a AckPolicy) String() string { @@ -345,6 +347,8 @@ func (a AckPolicy) String() string { return "none" case AckAll: return "all" + case AckFlowControl: + return "flow_control" default: return "explicit" } @@ -652,7 +656,7 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold } // Set proper default for max ack pending if we are ack explicit and none has been set. - if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 { + if config.MaxAckPending == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll || config.AckPolicy == AckFlowControl) { ackPending := JsDefaultMaxAckPending if lim.MaxAckPending > 0 && lim.MaxAckPending < ackPending { ackPending = lim.MaxAckPending @@ -674,6 +678,11 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig, if config.PriorityPolicy == PriorityPinnedClient && config.PinnedTTL == 0 { config.PinnedTTL = JsDefaultPinnedTTL } + + // Set default values for flow control policy. + if config.AckPolicy == AckFlowControl && !pedantic { + config.FlowControl = true + } return nil } @@ -723,6 +732,25 @@ func checkConsumerCfg( return NewJSConsumerAckWaitNegativeError() } + // Ack Flow Control policy requires push-based flow-controlled consumer. + if config.AckPolicy == AckFlowControl { + if config.DeliverSubject == _EMPTY_ { + return NewJSConsumerAckFCRequiresPushError() + } + if !config.FlowControl { + return NewJSConsumerAckFCRequiresFCError() + } + if config.MaxAckPending <= 0 { + return NewJSConsumerAckFCRequiresMaxAckPendingError() + } + if config.AckWait != 0 || len(config.BackOff) > 0 { + return NewJSConsumerAckFCRequiresNoAckWaitError() + } + if config.MaxDeliver > 0 { + return NewJSConsumerAckFCRequiresNoMaxDeliverError() + } + } + // Check if we have a BackOff defined that MaxDeliver is within range etc. if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && lbo > config.MaxDeliver { return NewJSConsumerMaxDeliverBackoffError() @@ -1075,7 +1103,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Check on stream type conflicts with WorkQueues. if cfg.Retention == WorkQueuePolicy && !config.Direct { // Force explicit acks here. - if config.AckPolicy != AckExplicit { + if config.AckPolicy != AckExplicit && config.AckPolicy != AckFlowControl { mset.mu.Unlock() return nil, NewJSConsumerWQRequiresExplicitAckError() } @@ -1535,7 +1563,7 @@ func (o *consumer) setLeader(isLeader bool) { } var err error - if o.cfg.AckPolicy != AckNone { + if o.cfg.AckPolicy != AckNone && o.cfg.AckPolicy != AckFlowControl { if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil { o.mu.Unlock() o.deleteWithoutAdvisory() @@ -3371,7 +3399,9 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b } // Check if this ack is above the current pointer to our next to deliver. - if sseq >= o.sseq { + // Ignore if it's a flow-controlled consumer, its state could end up further ahead + // since its state is not replicated before delivery. + if sseq >= o.sseq && !o.cfg.FlowControl { // Let's make sure this is valid. // This is only received on the consumer leader, so should never be higher // than the last stream sequence. But could happen if we've just become @@ -3429,7 +3459,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b } delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) - case AckAll: + case AckAll, AckFlowControl: // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() @@ -3589,7 +3619,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } switch o.cfg.AckPolicy { - case AckNone, AckAll: + case AckNone, AckAll, AckFlowControl: needAck = sseq > asflr case AckExplicit: if sseq > asflr { @@ -5181,7 +5211,15 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { if o.isActive() { o.mu.RLock() o.sendIdleHeartbeat(odsubj) + flowControl := o.cfg.AckPolicy == AckFlowControl && len(o.pending) > 0 o.mu.RUnlock() + + // Send flow control on EOS if it's used for acknowledgements. + if flowControl { + o.mu.Lock() + o.sendFlowControl() + o.mu.Unlock() + } } // Reset our idle heartbeat timer. hb.Reset(hbd) @@ -5349,7 +5387,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, // Update delivered first. o.updateDelivered(dseq, seq, dc, ts) - if ap == AckExplicit || ap == AckAll { + if ap == AckExplicit || ap == AckAll || ap == AckFlowControl { o.trackPending(seq, dseq) } else if ap == AckNone { o.adflr = dseq @@ -5401,6 +5439,10 @@ func (o *consumer) needFlowControl(sz int) bool { if o.fcid == _EMPTY_ && o.pbytes > o.maxpb/2 { return true } + // Or, when acking based on flow control, we need to send it if we've hit the max pending limit earlier. + if o.fcid == _EMPTY_ && o.cfg.AckPolicy == AckFlowControl && o.maxp > 0 && len(o.pending) >= o.maxp { + return true + } // If we have an existing outstanding FC, check to see if we need to expand the o.fcsz if o.fcid != _EMPTY_ && (o.pbytes-o.fcsz) >= o.maxpb { o.fcsz += sz @@ -5408,12 +5450,12 @@ func (o *consumer) needFlowControl(sz int) bool { return false } -func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, subj, _ string, _ []byte) { +func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, subj, _ string, rmsg []byte) { o.mu.Lock() - defer o.mu.Unlock() // Ignore if not the latest we have sent out. if subj != o.fcid { + o.mu.Unlock() return } @@ -5433,6 +5475,21 @@ func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, su o.fcid, o.fcsz = _EMPTY_, 0 o.signalNewMessages() + o.mu.Unlock() + + hdr, _ := c.msgParts(rmsg) + if len(hdr) > 0 { + ldseq := parseInt64(sliceHeader(JSLastConsumerSeq, hdr)) + lsseq := parseInt64(sliceHeader(JSLastStreamSeq, hdr)) + if lsseq > 0 { + // Delivered sequence is allowed to be zero as a response + // to flow control without any deliveries. + if ldseq <= 0 { + ldseq = 0 + } + o.processAckMsg(uint64(lsseq), uint64(ldseq), 1, _EMPTY_, false) + } + } } // Lock should be held. @@ -5615,8 +5672,9 @@ func (o *consumer) checkPending() { defer o.mu.Unlock() mset := o.mset + ttl := int64(o.cfg.AckWait) // On stop, mset and timer will be nil. - if o.closed || mset == nil || o.ptmr == nil { + if o.closed || mset == nil || o.ptmr == nil || ttl == 0 { o.stopAndClearPtmr() return } @@ -5627,7 +5685,6 @@ func (o *consumer) checkPending() { fseq := state.FirstSeq now := time.Now().UnixNano() - ttl := int64(o.cfg.AckWait) next := int64(o.ackWait(0)) // However, if there is backoff, initializes with the largest backoff. // It will be adjusted as needed. @@ -6638,6 +6695,12 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { } func (o *consumer) resetPtmr(delay time.Duration) { + // A delay of zero means it should be stopped. + if delay == 0 { + o.stopAndClearPtmr() + return + } + if o.ptmr == nil { o.ptmr = time.AfterFunc(delay, o.checkPending) } else { @@ -6647,6 +6710,10 @@ func (o *consumer) resetPtmr(delay time.Duration) { } func (o *consumer) stopAndClearPtmr() { + // If the end time is unset, short-circuit since the timer will already be stopped. + if o.ptmrEnd.IsZero() { + return + } stopAndClearTimer(&o.ptmr) o.ptmrEnd = time.Time{} } diff --git a/server/errors.json b/server/errors.json index c0e2818980..f77d049179 100644 --- a/server/errors.json +++ b/server/errors.json @@ -2008,5 +2008,75 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSMirrorConsumerRequiresAckFCErr", + "code": 400, + "error_code": 10203, + "description": "stream mirror consumer requires flow control ack policy", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSSourceConsumerRequiresAckFCErr", + "code": 400, + "error_code": 10204, + "description": "stream source consumer requires flow control ack policy", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerAckFCRequiresPushErr", + "code": 400, + "error_code": 10205, + "description": "flow control ack policy requires a push based consumer", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerAckFCRequiresFCErr", + "code": 400, + "error_code": 10206, + "description": "flow control ack policy requires flow control", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerAckFCRequiresMaxAckPendingErr", + "code": 400, + "error_code": 10207, + "description": "flow control ack policy requires max ack pending", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerAckFCRequiresNoAckWaitErr", + "code": 400, + "error_code": 10208, + "description": "flow control ack policy requires unset ack wait", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerAckFCRequiresNoMaxDeliverErr", + "code": 400, + "error_code": 10209, + "description": "flow control ack policy requires unset max deliver", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/filestore.go b/server/filestore.go index 34f2ea3813..6b0e88855a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -11319,7 +11319,7 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error { } // Check for AckAll here. - if o.cfg.AckPolicy == AckAll { + if o.cfg.AckPolicy == AckAll || o.cfg.AckPolicy == AckFlowControl { sgap := sseq - o.state.AckFloor.Stream o.state.AckFloor.Consumer = dseq o.state.AckFloor.Stream = sseq diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 14fb243a36..5e18bd908f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -765,6 +765,7 @@ type JSApiConsumerResetRequest struct { Seq uint64 `json:"seq"` } +// JSApiConsumerResetResponse is a superset of JSApiConsumerCreateResponse, but including an explicit ResetSeq. type JSApiConsumerResetResponse struct { ApiResponse *ConsumerInfo diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e601a42bb9..58455a6f14 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3653,7 +3653,7 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR if needLock { mset.mu.RLock() } - mset.sendFlowControlReply(reply) + mset.sendFlowControlReply(reply, hdr) if needLock { mset.mu.RUnlock() } @@ -5900,7 +5900,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { o.lat = time.Now() var sagap uint64 - if o.cfg.AckPolicy == AckAll { + if o.cfg.AckPolicy == AckAll || o.cfg.AckPolicy == AckFlowControl { // Always use the store state, as o.asflr is skipped ahead already. // Capture before updating store. state, err := o.store.BorrowState() @@ -8408,7 +8408,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // Check if we are work queue policy. // We will do pre-checks here to avoid thrashing meta layer. if sa.Config.Retention == WorkQueuePolicy && !cfg.Direct { - if cfg.AckPolicy != AckExplicit { + if cfg.AckPolicy != AckExplicit && cfg.AckPolicy != AckFlowControl { resp.Error = NewJSConsumerWQRequiresExplicitAckError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 912f041fe3..d8a18e1277 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6438,3 +6438,151 @@ func TestJetStreamClusterAccountFileStoreLimits(t *testing.T) { }) } } + +func TestJetStreamClusterDurableStreamMirror(t *testing.T) { + test := func(t *testing.T, replicas int, retention RetentionPolicy) { + var s *Server + if replicas == 1 { + s = RunBasicJetStreamServer(t) + defer s.Shutdown() + } else { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + s = c.randomServer() + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "O", + Subjects: []string{"foo"}, + Storage: FileStorage, + Replicas: replicas, + Retention: retention, + }) + require_NoError(t, err) + + _, err = jsConsumerCreate(t, nc, "O", ConsumerConfig{ + Durable: "C", + DeliverSubject: "deliver-subject", + Replicas: replicas, + AckPolicy: AckFlowControl, + Heartbeat: time.Second, + }, false) + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "M", + Mirror: &StreamSource{ + Name: "O", + ConsumerName: "C", + ConsumerDeliverSubject: "deliver-subject", + }, + Storage: FileStorage, + Replicas: replicas, + }) + require_NoError(t, err) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + si, err := js.StreamInfo("M") + if err != nil { + return err + } + if si.Mirror == nil { + return errors.New("no mirror") + } + if si.Mirror.Error != nil { + return si.Mirror.Error + } + _, err = js.GetMsg("M", 1) + return err + }) + } + + for _, replicas := range []int{1, 3} { + for _, retention := range []RetentionPolicy{LimitsPolicy, InterestPolicy, WorkQueuePolicy} { + t.Run(fmt.Sprintf("R%d/%s", replicas, retention), func(t *testing.T) { + test(t, replicas, retention) + }) + } + } +} + +func TestJetStreamClusterDurableStreamSource(t *testing.T) { + test := func(t *testing.T, replicas int, retention RetentionPolicy) { + var s *Server + if replicas == 1 { + s = RunBasicJetStreamServer(t) + defer s.Shutdown() + } else { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + s = c.randomServer() + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "O", + Subjects: []string{"foo"}, + Storage: FileStorage, + Replicas: replicas, + Retention: retention, + }) + require_NoError(t, err) + + _, err = jsConsumerCreate(t, nc, "O", ConsumerConfig{ + Durable: "C", + DeliverSubject: "deliver-subject", + Replicas: replicas, + AckPolicy: AckFlowControl, + Heartbeat: time.Second, + }, false) + require_NoError(t, err) + + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "S", + Sources: []*StreamSource{{ + Name: "O", + ConsumerName: "C", + ConsumerDeliverSubject: "deliver-subject", + }}, + Storage: FileStorage, + Replicas: replicas, + }) + require_NoError(t, err) + + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + si, err := js.StreamInfo("S") + if err != nil { + return err + } + if len(si.Sources) != 1 { + return errors.New("no source") + } + if si.Sources[0].Error != nil { + return si.Sources[0].Error + } + _, err = js.GetMsg("S", 1) + return err + }) + } + + for _, replicas := range []int{1, 3} { + for _, retention := range []RetentionPolicy{LimitsPolicy, InterestPolicy, WorkQueuePolicy} { + t.Run(fmt.Sprintf("R%d/%s", replicas, retention), func(t *testing.T) { + test(t, replicas, retention) + }) + } + } +} diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 329cda195c..7d63981c84 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10779,3 +10779,91 @@ func TestJetStreamConsumerResetToSequenceConstraintOnStartTime(t *testing.T) { require_Equal(t, string(msgs[0].Data), "msg3") require_NoError(t, msgs[0].AckSync()) } + +func TestJetStreamConsumerAckFlowControlBasics(t *testing.T) { + test := func(replicas int) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc := clientConnectToServer(t, c.randomServer()) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + Storage: FileStorage, + Retention: LimitsPolicy, + }) + require_NoError(t, err) + + checkConfig := func(ccfg *ConsumerConfig) { + require_Equal(t, ccfg.AckPolicy, AckFlowControl) + require_True(t, ccfg.FlowControl) + require_Equal(t, ccfg.Heartbeat, time.Second) + require_Equal(t, ccfg.AckWait, 0) + require_Equal(t, ccfg.MaxAckPending, JsDefaultMaxAckPending) + require_Equal(t, ccfg.MaxDeliver, -1) + } + + // Only the deliver subject, policy and heartbeat are required; others are automatically defaulted. + cfg := ConsumerConfig{ + Durable: "DEFAULT", + DeliverSubject: "deliver-subject", + AckPolicy: AckFlowControl, + Heartbeat: time.Second, + } + ccfg, err := jsConsumerCreate(t, nc, "TEST", cfg, false) + require_NoError(t, err) + checkConfig(ccfg) + + cfg = ConsumerConfig{ + Durable: "CONSUMER", + AckPolicy: AckFlowControl, + DeliverSubject: _EMPTY_, + FlowControl: false, + MaxAckPending: -1, + AckWait: 30 * time.Second, + MaxDeliver: 1, + } + _, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_Error(t, err, NewJSConsumerAckFCRequiresPushError()) + + cfg.DeliverSubject = "deliver-subject" + _, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_Error(t, err, NewJSConsumerAckFCRequiresFCError()) + + cfg.FlowControl = true + _, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_Error(t, err, NewJSConsumerAckFCRequiresMaxAckPendingError()) + + cfg.MaxAckPending = JsDefaultMaxAckPending + _, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_Error(t, err, NewJSConsumerAckFCRequiresNoAckWaitError()) + + cfg.AckWait = time.Second + cfg.BackOff = []time.Duration{time.Second, 2 * time.Second} + _, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_Error(t, err, NewJSConsumerAckFCRequiresNoAckWaitError()) + + cfg.AckWait = 0 + cfg.BackOff = nil + _, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_Error(t, err, NewJSConsumerAckFCRequiresNoMaxDeliverError()) + + cfg.MaxDeliver = 0 + _, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_Error(t, err, NewJSConsumerWithFlowControlNeedsHeartbeatsError()) + + cfg.Heartbeat = time.Second + ccfg, err = jsConsumerCreate(t, nc, "TEST", cfg, true) + require_NoError(t, err) + checkConfig(ccfg) + } + + for _, replicas := range []int{1, 3} { + t.Run(fmt.Sprintf("R%d", replicas), func(t *testing.T) { + test(replicas) + }) + } +} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 323603fc1f..dab1de4bec 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -68,6 +68,21 @@ const ( // JSClusterUnSupportFeatureErr not currently supported in clustered mode JSClusterUnSupportFeatureErr ErrorIdentifier = 10036 + // JSConsumerAckFCRequiresFCErr flow control ack policy requires flow control + JSConsumerAckFCRequiresFCErr ErrorIdentifier = 10206 + + // JSConsumerAckFCRequiresMaxAckPendingErr flow control ack policy requires max ack pending + JSConsumerAckFCRequiresMaxAckPendingErr ErrorIdentifier = 10207 + + // JSConsumerAckFCRequiresNoAckWaitErr flow control ack policy requires unset ack wait + JSConsumerAckFCRequiresNoAckWaitErr ErrorIdentifier = 10208 + + // JSConsumerAckFCRequiresNoMaxDeliverErr flow control ack policy requires unset max deliver + JSConsumerAckFCRequiresNoMaxDeliverErr ErrorIdentifier = 10209 + + // JSConsumerAckFCRequiresPushErr flow control ack policy requires a push based consumer + JSConsumerAckFCRequiresPushErr ErrorIdentifier = 10205 + // JSConsumerAckPolicyInvalidErr consumer ack policy invalid JSConsumerAckPolicyInvalidErr ErrorIdentifier = 10181 @@ -329,6 +344,9 @@ const ( // JSMessageTTLInvalidErr invalid per-message TTL JSMessageTTLInvalidErr ErrorIdentifier = 10165 + // JSMirrorConsumerRequiresAckFCErr stream mirror consumer requires flow control ack policy + JSMirrorConsumerRequiresAckFCErr ErrorIdentifier = 10203 + // JSMirrorConsumerSetupFailedErrF generic mirror consumer setup failure string ({err}) JSMirrorConsumerSetupFailedErrF ErrorIdentifier = 10029 @@ -416,6 +434,9 @@ const ( // JSSnapshotDeliverSubjectInvalidErr deliver subject not valid JSSnapshotDeliverSubjectInvalidErr ErrorIdentifier = 10015 + // JSSourceConsumerRequiresAckFCErr stream source consumer requires flow control ack policy + JSSourceConsumerRequiresAckFCErr ErrorIdentifier = 10204 + // JSSourceConsumerSetupFailedErrF General source consumer setup failure string ({err}) JSSourceConsumerSetupFailedErrF ErrorIdentifier = 10045 @@ -632,6 +653,11 @@ var ( JSClusterServerNotMemberErr: {Code: 400, ErrCode: 10044, Description: "server is not a member of the cluster"}, JSClusterTagsErr: {Code: 400, ErrCode: 10011, Description: "tags placement not supported for operation"}, JSClusterUnSupportFeatureErr: {Code: 503, ErrCode: 10036, Description: "not currently supported in clustered mode"}, + JSConsumerAckFCRequiresFCErr: {Code: 400, ErrCode: 10206, Description: "flow control ack policy requires flow control"}, + JSConsumerAckFCRequiresMaxAckPendingErr: {Code: 400, ErrCode: 10207, Description: "flow control ack policy requires max ack pending"}, + JSConsumerAckFCRequiresNoAckWaitErr: {Code: 400, ErrCode: 10208, Description: "flow control ack policy requires unset ack wait"}, + JSConsumerAckFCRequiresNoMaxDeliverErr: {Code: 400, ErrCode: 10209, Description: "flow control ack policy requires unset max deliver"}, + JSConsumerAckFCRequiresPushErr: {Code: 400, ErrCode: 10205, Description: "flow control ack policy requires a push based consumer"}, JSConsumerAckPolicyInvalidErr: {Code: 400, ErrCode: 10181, Description: "consumer ack policy invalid"}, JSConsumerAckWaitNegativeErr: {Code: 400, ErrCode: 10183, Description: "consumer ack wait needs to be positive"}, JSConsumerAlreadyExists: {Code: 400, ErrCode: 10148, Description: "consumer already exists"}, @@ -719,6 +745,7 @@ var ( JSMessageSchedulesTargetInvalidErr: {Code: 400, ErrCode: 10190, Description: "message schedules target is invalid"}, JSMessageTTLDisabledErr: {Code: 400, ErrCode: 10166, Description: "per-message TTL is disabled"}, JSMessageTTLInvalidErr: {Code: 400, ErrCode: 10165, Description: "invalid per-message TTL"}, + JSMirrorConsumerRequiresAckFCErr: {Code: 400, ErrCode: 10203, Description: "stream mirror consumer requires flow control ack policy"}, JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"}, JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"}, JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10151, Description: "mirror transform source: {err}"}, @@ -748,6 +775,7 @@ var ( JSRestoreSubscribeFailedErrF: {Code: 500, ErrCode: 10042, Description: "JetStream unable to subscribe to restore snapshot {subject}: {err}"}, JSSequenceNotFoundErrF: {Code: 400, ErrCode: 10043, Description: "sequence {seq} not found"}, JSSnapshotDeliverSubjectInvalidErr: {Code: 400, ErrCode: 10015, Description: "deliver subject not valid"}, + JSSourceConsumerRequiresAckFCErr: {Code: 400, ErrCode: 10204, Description: "stream source consumer requires flow control ack policy"}, JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"}, JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"}, JSSourceInvalidStreamName: {Code: 400, ErrCode: 10141, Description: "sourced stream name is invalid"}, @@ -1065,6 +1093,56 @@ func NewJSClusterUnSupportFeatureError(opts ...ErrorOption) *ApiError { return ApiErrors[JSClusterUnSupportFeatureErr] } +// NewJSConsumerAckFCRequiresFCError creates a new JSConsumerAckFCRequiresFCErr error: "flow control ack policy requires flow control" +func NewJSConsumerAckFCRequiresFCError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerAckFCRequiresFCErr] +} + +// NewJSConsumerAckFCRequiresMaxAckPendingError creates a new JSConsumerAckFCRequiresMaxAckPendingErr error: "flow control ack policy requires max ack pending" +func NewJSConsumerAckFCRequiresMaxAckPendingError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerAckFCRequiresMaxAckPendingErr] +} + +// NewJSConsumerAckFCRequiresNoAckWaitError creates a new JSConsumerAckFCRequiresNoAckWaitErr error: "flow control ack policy requires unset ack wait" +func NewJSConsumerAckFCRequiresNoAckWaitError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerAckFCRequiresNoAckWaitErr] +} + +// NewJSConsumerAckFCRequiresNoMaxDeliverError creates a new JSConsumerAckFCRequiresNoMaxDeliverErr error: "flow control ack policy requires unset max deliver" +func NewJSConsumerAckFCRequiresNoMaxDeliverError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerAckFCRequiresNoMaxDeliverErr] +} + +// NewJSConsumerAckFCRequiresPushError creates a new JSConsumerAckFCRequiresPushErr error: "flow control ack policy requires a push based consumer" +func NewJSConsumerAckFCRequiresPushError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerAckFCRequiresPushErr] +} + // NewJSConsumerAckPolicyInvalidError creates a new JSConsumerAckPolicyInvalidErr error: "consumer ack policy invalid" func NewJSConsumerAckPolicyInvalidError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -2013,6 +2091,16 @@ func NewJSMessageTTLInvalidError(opts ...ErrorOption) *ApiError { return ApiErrors[JSMessageTTLInvalidErr] } +// NewJSMirrorConsumerRequiresAckFCError creates a new JSMirrorConsumerRequiresAckFCErr error: "stream mirror consumer requires flow control ack policy" +func NewJSMirrorConsumerRequiresAckFCError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSMirrorConsumerRequiresAckFCErr] +} + // NewJSMirrorConsumerSetupFailedError creates a new JSMirrorConsumerSetupFailedErrF error: "{err}" func NewJSMirrorConsumerSetupFailedError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -2345,6 +2433,16 @@ func NewJSSnapshotDeliverSubjectInvalidError(opts ...ErrorOption) *ApiError { return ApiErrors[JSSnapshotDeliverSubjectInvalidErr] } +// NewJSSourceConsumerRequiresAckFCError creates a new JSSourceConsumerRequiresAckFCErr error: "stream source consumer requires flow control ack policy" +func NewJSSourceConsumerRequiresAckFCError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSSourceConsumerRequiresAckFCErr] +} + // NewJSSourceConsumerSetupFailedError creates a new JSSourceConsumerSetupFailedErrF error: "{err}" func NewJSSourceConsumerSetupFailedError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index b5afe75506..c98504aa6a 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1328,6 +1328,27 @@ func jsStreamUpdate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConf return &resp.Config, nil } +// jsConsumerCreate is for sending a consumer create for fields that nats.go does not know about yet. +func jsConsumerCreate(t testing.TB, nc *nats.Conn, stream string, cfg ConsumerConfig, pedantic bool) (*ConsumerConfig, error) { + t.Helper() + + j, err := json.Marshal(CreateConsumerRequest{Stream: stream, Config: cfg, Pedantic: pedantic}) + require_NoError(t, err) + + msg, err := nc.Request(fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable), j, time.Second*3) + require_NoError(t, err) + + var resp JSApiConsumerCreateResponse + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + + if resp.Error != nil { + return nil, resp.Error + } + + require_NotNil(t, resp.ConsumerInfo) + return resp.Config, nil +} + func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { t.Helper() checkFor(t, 10*time.Second, 20*time.Millisecond, func() error { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 198e4f2eaf..f7e065ced8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22121,3 +22121,82 @@ func TestJetStreamImplicitRePublishAfterSubjectTransform(t *testing.T) { _, err = js.UpdateStream(cfg) require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))) } + +func TestJetStreamDurableStreamMirrorAndSourceIncorrectConsumerConfig(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "O", + Subjects: []string{"foo"}, + Storage: FileStorage, + }) + require_NoError(t, err) + + _, err = jsConsumerCreate(t, nc, "O", ConsumerConfig{ + Durable: "C", + DeliverSubject: "deliver-subject", + AckPolicy: AckExplicit, + }, false) + require_NoError(t, err) + + // Mirror. + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "M", + Mirror: &StreamSource{ + Name: "O", + ConsumerName: "C", + ConsumerDeliverSubject: "deliver-subject", + }, + Storage: FileStorage, + }) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + si, err := js.StreamInfo("M") + if err != nil { + return err + } + if si.Mirror == nil { + return errors.New("no mirror") + } + if si.Mirror.Error == nil { + return errors.New("expected mirror error") + } + if si.Mirror.Error.Description != NewJSMirrorConsumerRequiresAckFCError().Description { + return si.Mirror.Error + } + return nil + }) + + // Source. + _, err = jsStreamCreate(t, nc, &StreamConfig{ + Name: "S", + Sources: []*StreamSource{{ + Name: "O", + ConsumerName: "C", + ConsumerDeliverSubject: "deliver-subject", + }}, + Storage: FileStorage, + }) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + si, err := js.StreamInfo("S") + if err != nil { + return err + } + if len(si.Sources) != 1 { + return errors.New("no source") + } + sourceErr := si.Sources[0].Error + if sourceErr == nil { + return errors.New("expected source error") + } + if sourceErr.Description != NewJSSourceConsumerRequiresAckFCError().Description { + return sourceErr + } + return nil + }) +} diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 8d6e9e7cab..d1bde6db17 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -17,7 +17,7 @@ import "strconv" const ( // JSApiLevel is the maximum supported JetStream API level for this server. - JSApiLevel int = 2 + JSApiLevel int = 3 JSRequiredLevelMetadataKey = "_nats.req.level" JSServerVersionMetadataKey = "_nats.ver" @@ -158,6 +158,11 @@ func setStaticConsumerMetadata(cfg *ConsumerConfig) { requires(1) } + // Added in 2.14 + if cfg.AckPolicy == AckFlowControl { + requires(3) + } + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) } diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index c98a1cc6ec..4a743b6445 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -248,6 +248,11 @@ func TestJetStreamSetStaticConsumerMetadata(t *testing.T) { cfg: &ConsumerConfig{PriorityPolicy: PriorityPinnedClient, PriorityGroups: []string{"a"}}, expectedMetadata: metadataAtLevel("1"), }, + { + desc: "AckFlowControl", + cfg: &ConsumerConfig{AckPolicy: AckFlowControl}, + expectedMetadata: metadataAtLevel("3"), + }, } { t.Run(test.desc, func(t *testing.T) { setStaticConsumerMetadata(test.cfg) diff --git a/server/memstore.go b/server/memstore.go index 091561b4ee..b30b94deda 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -2428,7 +2428,7 @@ func (o *consumerMemStore) UpdateAcks(dseq, sseq uint64) error { } // Check for AckAll here. - if o.cfg.AckPolicy == AckAll { + if o.cfg.AckPolicy == AckAll || o.cfg.AckPolicy == AckFlowControl { sgap := sseq - o.state.AckFloor.Stream o.state.AckFloor.Consumer = dseq o.state.AckFloor.Stream = sseq diff --git a/server/store.go b/server/store.go index 7d7d6291ce..800ea08391 100644 --- a/server/store.go +++ b/server/store.go @@ -594,15 +594,17 @@ func (st *StorageType) UnmarshalJSON(data []byte) error { } const ( - ackNonePolicyJSONString = `"none"` - ackAllPolicyJSONString = `"all"` - ackExplicitPolicyJSONString = `"explicit"` + ackNonePolicyJSONString = `"none"` + ackAllPolicyJSONString = `"all"` + ackExplicitPolicyJSONString = `"explicit"` + ackFlowControlPolicyJSONString = `"flow_control"` ) var ( - ackNonePolicyJSONBytes = []byte(ackNonePolicyJSONString) - ackAllPolicyJSONBytes = []byte(ackAllPolicyJSONString) - ackExplicitPolicyJSONBytes = []byte(ackExplicitPolicyJSONString) + ackNonePolicyJSONBytes = []byte(ackNonePolicyJSONString) + ackAllPolicyJSONBytes = []byte(ackAllPolicyJSONString) + ackExplicitPolicyJSONBytes = []byte(ackExplicitPolicyJSONString) + ackFlowControlPolicyJSONBytes = []byte(ackFlowControlPolicyJSONString) ) func (ap AckPolicy) MarshalJSON() ([]byte, error) { @@ -613,6 +615,8 @@ func (ap AckPolicy) MarshalJSON() ([]byte, error) { return ackAllPolicyJSONBytes, nil case AckExplicit: return ackExplicitPolicyJSONBytes, nil + case AckFlowControl: + return ackFlowControlPolicyJSONBytes, nil default: return nil, fmt.Errorf("can not marshal %v", ap) } @@ -626,6 +630,8 @@ func (ap *AckPolicy) UnmarshalJSON(data []byte) error { *ap = AckAll case ackExplicitPolicyJSONString: *ap = AckExplicit + case ackFlowControlPolicyJSONString: + *ap = AckFlowControl default: return fmt.Errorf("can not unmarshal %q", data) } diff --git a/server/stream.go b/server/stream.go index 2d3c6011ca..dea238e106 100644 --- a/server/stream.go +++ b/server/stream.go @@ -338,12 +338,14 @@ type StreamSourceInfo struct { // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` - External *ExternalStream `json:"external,omitempty"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + ConsumerName string `json:"consumer_name,omitempty"` + ConsumerDeliverSubject string `json:"consumer_deliver_subject,omitempty"` // Internal iname string // For indexing when stream names are the same for multiple sources. @@ -2870,12 +2872,12 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { var needsRetry bool // Flow controls have reply subjects. if m.rply != _EMPTY_ { - mset.handleFlowControl(m) + mset.handleFlowControl(m, mset.mirror.dseq, mset.mirror.sseq) } else { // For idle heartbeats make sure we did not miss anything and check if we are considered stalled. - if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq { + if ldseq := parseInt64(sliceHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq { needsRetry = true - } else if fcReply := getHeader(JSConsumerStalled, m.hdr); len(fcReply) > 0 { + } else if fcReply := sliceHeader(JSConsumerStalled, m.hdr); len(fcReply) > 0 { // Other side thinks we are stalled, so send flow control reply. mset.outq.sendMsg(string(fcReply), nil) } @@ -2901,6 +2903,10 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { mset.mirror.sseq++ } else if sseq <= mset.mirror.sseq { // Ignore older messages. + // If the deliver sequence matches, we only updated delivered accounting. + if dseq == mset.mirror.dseq+1 { + mset.mirror.dseq++ + } mset.mu.Unlock() return true } else if mset.mirror.cname == _EMPTY_ { @@ -3149,9 +3155,12 @@ func (mset *stream) setupMirrorConsumer() error { // Determine subjects etc. var deliverSubject string + var durableDeliverSubject string ext := mset.cfg.Mirror.External - - if ext != nil && ext.DeliverPrefix != _EMPTY_ { + if mset.cfg.Mirror.ConsumerName != _EMPTY_ && mset.cfg.Mirror.ConsumerDeliverSubject != _EMPTY_ { + mirror.cname = mset.cfg.Mirror.ConsumerName + durableDeliverSubject = mset.cfg.Mirror.ConsumerDeliverSubject + } else if ext != nil && ext.DeliverPrefix != _EMPTY_ { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".") } else { deliverSubject = syncSubject("$JS.M") @@ -3159,6 +3168,7 @@ func (mset *stream) setupMirrorConsumer() error { // Now send off request to create/update our consumer. This will be all API based even in single server mode. // We calculate durable names apriori so we do not need to save them off. + // If we're using a pre-existing consumer, we'll send a consumer reset request instead. var state StreamState mset.store.FastState(&state) @@ -3249,7 +3259,9 @@ func (mset *stream) setupMirrorConsumer() error { } var subject string - if req.Config.FilterSubject != _EMPTY_ { + if durableDeliverSubject != _EMPTY_ { + subject = fmt.Sprintf(JSApiConsumerResetT, mset.cfg.Mirror.Name, mirror.cname) + } else if req.Config.FilterSubject != _EMPTY_ { req.Config.Name = fmt.Sprintf("mirror-%s", createConsumerName()) subject = fmt.Sprintf(JSApiConsumerCreateExT, mset.cfg.Mirror.Name, req.Config.Name, req.Config.FilterSubject) } else { @@ -3260,16 +3272,21 @@ func (mset *stream) setupMirrorConsumer() error { subject = strings.ReplaceAll(subject, "..", ".") } - // Marshal now that we are done with `req`. - b, _ := json.Marshal(req) - // Reset mirror.msgs = nil mirror.err = nil mirror.sip = true - // Send the consumer create request - mset.outq.send(newJSPubMsg(subject, _EMPTY_, reply, nil, b, nil, 0)) + if durableDeliverSubject != _EMPTY_ { + // Send the consumer reset request + mset.outq.send(newJSPubMsg(subject, _EMPTY_, reply, nil, nil, nil, 0)) + } else { + // Marshal now that we are done with `req`. + b, _ := json.Marshal(req) + + // Send the consumer create request + mset.outq.send(newJSPubMsg(subject, _EMPTY_, reply, nil, b, nil, 0)) + } go func() { @@ -3322,6 +3339,15 @@ func (mset *stream) setupMirrorConsumer() error { // Create a new queue each time mirror.msgs = newIPQueue[*inMsg](mset.srv, qname) msgs := mirror.msgs + if durableDeliverSubject != _EMPTY_ { + deliverSubject = durableDeliverSubject + if ccr.ConsumerInfo.Config.AckPolicy != AckFlowControl { + mirror.err = NewJSMirrorConsumerRequiresAckFCError() + retry = true + mset.mu.Unlock() + return + } + } sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy. if len(hdr) > 0 { @@ -3342,13 +3368,8 @@ func (mset *stream) setupMirrorConsumer() error { // Save our sub. mirror.sub = sub - // When an upstream stream expires messages or in general has messages that we want - // that are no longer available we need to adjust here. - var state StreamState - mset.store.FastState(&state) - // Check if we need to skip messages. - if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream { + if state.LastSeq < ccr.ConsumerInfo.Delivered.Stream { // Check to see if delivered is past our last and we have no msgs. This will help the // case when mirroring a stream that has a very high starting sequence number. if state.Msgs == 0 && ccr.ConsumerInfo.Delivered.Stream > state.LastSeq { @@ -3363,6 +3384,9 @@ func (mset *stream) setupMirrorConsumer() error { mirror.cname = ccr.ConsumerInfo.Name mirror.dseq = 0 mirror.sseq = ccr.ConsumerInfo.Delivered.Stream + if durableDeliverSubject != _EMPTY_ && state.LastSeq > mirror.sseq { + mirror.sseq = state.LastSeq + } mirror.qch = make(chan struct{}) mirror.wg.Add(1) ready.Add(1) @@ -3517,9 +3541,12 @@ func (mset *stream) trySetupSourceConsumer(iname string, seq uint64, startTime t // Determine subjects etc. var deliverSubject string + var durableDeliverSubject string ext := ssi.External - - if ext != nil && ext.DeliverPrefix != _EMPTY_ { + if ssi.ConsumerName != _EMPTY_ && ssi.ConsumerDeliverSubject != _EMPTY_ { + si.cname = ssi.ConsumerName + durableDeliverSubject = ssi.ConsumerDeliverSubject + } else if ext != nil && ext.DeliverPrefix != _EMPTY_ { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".") } else { deliverSubject = syncSubject("$JS.S") @@ -3601,7 +3628,9 @@ func (mset *stream) trySetupSourceConsumer(iname string, seq uint64, startTime t } var subject string - if req.Config.FilterSubject != _EMPTY_ { + if durableDeliverSubject != _EMPTY_ { + subject = fmt.Sprintf(JSApiConsumerResetT, si.name, si.cname) + } else if req.Config.FilterSubject != _EMPTY_ { req.Config.Name = fmt.Sprintf("src-%s", createConsumerName()) subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject) } else if len(req.Config.FilterSubjects) == 1 { @@ -3619,16 +3648,21 @@ func (mset *stream) trySetupSourceConsumer(iname string, seq uint64, startTime t subject = strings.ReplaceAll(subject, "..", ".") } - // Marshal request. - b, _ := json.Marshal(req) - // Reset si.msgs = nil si.err = nil si.sip = true - // Send the consumer create request - mset.outq.send(newJSPubMsg(subject, _EMPTY_, reply, nil, b, nil, 0)) + if durableDeliverSubject != _EMPTY_ { + // Send the consumer reset request + mset.outq.send(newJSPubMsg(subject, _EMPTY_, reply, nil, nil, nil, 0)) + } else { + // Marshal request. + b, _ := json.Marshal(req) + + // Send the consumer create request + mset.outq.send(newJSPubMsg(subject, _EMPTY_, reply, nil, b, nil, 0)) + } go func() { @@ -3684,8 +3718,8 @@ func (mset *stream) trySetupSourceConsumer(iname string, seq uint64, startTime t } // Setup actual subscription to process messages from our source. - if si.sseq != ccr.ConsumerInfo.Delivered.Stream { - si.sseq = ccr.ConsumerInfo.Delivered.Stream + 1 + if si.sseq < ccr.ConsumerInfo.Delivered.Stream { + si.sseq = ccr.ConsumerInfo.Delivered.Stream } // Capture consumer name. si.cname = ccr.ConsumerInfo.Name @@ -3697,6 +3731,15 @@ func (mset *stream) trySetupSourceConsumer(iname string, seq uint64, startTime t si.last.Store(time.Now().UnixNano()) msgs := mset.smsgs + if durableDeliverSubject != _EMPTY_ { + deliverSubject = durableDeliverSubject + if ccr.ConsumerInfo.Config.AckPolicy != AckFlowControl { + si.err = NewJSSourceConsumerRequiresAckFCError() + retry = true + mset.mu.Unlock() + return + } + } sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy. mset.queueInbound(msgs, subject, reply, hdr, msg, si, nil) @@ -3814,20 +3857,39 @@ func (m *inMsg) isControlMsg() bool { // Sends a reply to a flow control request. // Lock should be held. -func (mset *stream) sendFlowControlReply(reply string) { +func (mset *stream) sendFlowControlReply(reply string, hdr []byte) { if mset.isLeader() && mset.outq != nil { - mset.outq.sendMsg(reply, nil) + dseq := parseInt64(sliceHeader(JSLastConsumerSeq, hdr)) + sseq := parseInt64(sliceHeader(JSLastStreamSeq, hdr)) + + // If we're responding to flow control without being delivered messages (for example after a restart), + // we'll only have the stream sequence. + if sseq > 0 { + if dseq <= 0 { + dseq = 0 + } + const t = "NATS/1.0\r\n%s: %d\r\n%s: %d\r\n\r\n" + hdr = fmt.Appendf(nil, t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq) + mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } else { + mset.outq.sendMsg(reply, nil) + } } } // handleFlowControl will properly handle flow control messages for both R==1 and R>1. // Lock should be held. -func (mset *stream) handleFlowControl(m *inMsg) { +func (mset *stream) handleFlowControl(m *inMsg, dseq, sseq uint64) { // If we are clustered we will send the flow control message through the replication stack. if mset.isClustered() { + // Append the current delivery and stream sequences, to be sent after replication. + m.hdr = genHeader(m.hdr, JSLastConsumerSeq, strconv.FormatUint(dseq, 10)) + m.hdr = genHeader(m.hdr, JSLastStreamSeq, strconv.FormatUint(sseq, 10)) mset.node.Propose(encodeStreamMsg(_EMPTY_, m.rply, m.hdr, nil, 0, 0, false)) } else { - mset.outq.sendMsg(m.rply, nil) + const t = "NATS/1.0\r\n%s: %d\r\n%s: %d\r\n\r\n" + hdr := fmt.Appendf(nil, t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq) + mset.outq.send(newJSPubMsg(m.rply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) } } @@ -3854,13 +3916,13 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { var needsRetry bool // Flow controls have reply subjects. if m.rply != _EMPTY_ { - mset.handleFlowControl(m) + mset.handleFlowControl(m, si.dseq, si.sseq) } else { // For idle heartbeats make sure we did not miss anything. - if ldseq := parseInt64(getHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq { + if ldseq := parseInt64(sliceHeader(JSLastConsumerSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq { needsRetry = true mset.retrySourceConsumerAtSeq(si.iname, si.sseq+1) - } else if fcReply := getHeader(JSConsumerStalled, m.hdr); len(fcReply) > 0 { + } else if fcReply := sliceHeader(JSConsumerStalled, m.hdr); len(fcReply) > 0 { // Other side thinks we are stalled, so send flow control reply. mset.outq.sendMsg(string(fcReply), nil) } @@ -3880,6 +3942,14 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { if dseq == si.dseq+1 { si.dseq++ si.sseq = sseq + } else if sseq <= si.sseq { + // Ignore older messages. + // If the deliver sequence matches, we only updated delivered accounting. + if dseq == si.dseq+1 { + si.dseq++ + } + mset.mu.Unlock() + return true } else if dseq > si.dseq { if si.cname == _EMPTY_ { si.cname = tokenAt(m.rply, 4)