Skip to content

Commit 4598607

Browse files
(2.12) [ADDED] Offline assets support (#7158)
Implements [Offline Assets support from ADR-44](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-44.md). Streams and consumers will be put in "offline mode" if the server doesn't support the required API level. Example: ``` [INF] ---------------- JETSTREAM ---------------- ... [INF] API Level: 2 [INF] ------------------------------------------- [WRN] Detected unsupported stream 'js > test-stream', delete the stream or upgrade the server to API level 10 [INF] Starting JetStream cluster [INF] Creating JetStream metadata controller [INF] JetStream cluster recovering state [INF] Listening for client connections on 0.0.0.0:4444 [WRN] Detected unsupported stream 'js > test-stream', delete the stream or upgrade the server to API level 10 [INF] Server is ready [INF] Cluster name is nats-cluster ``` Stream/consumer list and names requests still return the names of these streams. Stream/consumer info still works in this state, and a stream/consumer can still be deleted as well. The "offline reason" is included as an error in the stream/consumer info requests and is contained in the `offline: map[string]string` field in the list requests. These issues will be logged during startup, if an update request is performed. The raw JSON is always preserved, so downgrading to a server that doesn't understand the new JSON fields can safely upgrade and the JSON will be preserved as expected. Importantly, if a stream or consumer is recognized as unsupported, they are not even loaded by the server. This means data will be safe and not even be loaded on an older version server. This will allow for more graceful downgrades from 2.12 to 2.11 (and onward), where loads of new features would stop functioning after a downgrade: counter-streams, atomic batch, etc. Signed-off-by: Maurice van Veen <[email protected]>
2 parents ed76941 + eebc56c commit 4598607

12 files changed

+1508
-90
lines changed

server/consumer.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"fmt"
2222
"math"
2323
"math/rand"
24+
"os"
25+
"path/filepath"
2426
"reflect"
2527
"regexp"
2628
"slices"
@@ -71,6 +73,13 @@ type ConsumerInfo struct {
7173
PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"`
7274
}
7375

76+
// consumerInfoClusterResponse is a response used in a cluster to communicate the consumer info
77+
// back to the meta leader as part of a consumer list request.
78+
type consumerInfoClusterResponse struct {
79+
ConsumerInfo
80+
OfflineReason string `json:"offline_reason,omitempty"` // Reporting when a consumer is offline.
81+
}
82+
7483
type PriorityGroupState struct {
7584
Group string `json:"group"`
7685
PinnedClientID string `json:"pinned_client_id,omitempty"`
@@ -510,6 +519,10 @@ type consumer struct {
510519
/// pinnedTtl is the remaining time before the current PinId expires.
511520
pinnedTtl *time.Timer
512521
pinnedTS time.Time
522+
523+
// If standalone/single-server, the offline reason needs to be stored directly in the consumer.
524+
// Otherwise, if clustered it will be part of the consumer assignment.
525+
offlineReason string
513526
}
514527

515528
// A single subject filter.
@@ -5059,7 +5072,7 @@ func (o *consumer) setMaxPendingBytes(limit int) {
50595072
// This does some quick sanity checks to see if we should re-calculate num pending.
50605073
// Lock should be held.
50615074
func (o *consumer) checkNumPending() uint64 {
5062-
if o.mset != nil {
5075+
if o.mset != nil && o.mset.store != nil {
50635076
var state StreamState
50645077
o.mset.store.FastState(&state)
50655078
npc := o.numPending()
@@ -6103,6 +6116,14 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
61036116
} else {
61046117
err = store.Stop()
61056118
}
6119+
} else if dflag {
6120+
// If there's no store (for example, when it's offline), manually delete the directories.
6121+
o.mu.RLock()
6122+
stream, consumer := o.stream, o.name
6123+
o.mu.RUnlock()
6124+
accDir := filepath.Join(js.config.StoreDir, a.GetName())
6125+
consumersDir := filepath.Join(accDir, streamsDir, stream, consumerDir)
6126+
os.RemoveAll(filepath.Join(consumersDir, consumer))
61066127
}
61076128

61086129
return err

server/errors.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,5 +1918,25 @@
19181918
"help": "",
19191919
"url": "",
19201920
"deprecates": ""
1921+
},
1922+
{
1923+
"constant": "JSStreamOfflineReasonErrF",
1924+
"code": 500,
1925+
"error_code": 10194,
1926+
"description": "stream is offline: {err}",
1927+
"comment": "",
1928+
"help": "",
1929+
"url": "",
1930+
"deprecates": ""
1931+
},
1932+
{
1933+
"constant": "JSConsumerOfflineReasonErrF",
1934+
"code": 500,
1935+
"error_code": 10195,
1936+
"description": "consumer is offline: {err}",
1937+
"comment": "",
1938+
"help": "",
1939+
"url": "",
1940+
"deprecates": ""
19211941
}
19221942
]

server/jetstream.go

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package server
1515

1616
import (
17+
"bytes"
1718
"crypto/hmac"
1819
"crypto/sha256"
1920
"encoding/binary"
@@ -1333,8 +1334,54 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
13331334
}
13341335

13351336
var cfg FileStreamInfo
1336-
if err := json.Unmarshal(buf, &cfg); err != nil {
1337-
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err)
1337+
decoder := json.NewDecoder(bytes.NewReader(buf))
1338+
decoder.DisallowUnknownFields()
1339+
strictErr := decoder.Decode(&cfg)
1340+
if strictErr != nil {
1341+
cfg = FileStreamInfo{}
1342+
if err := json.Unmarshal(buf, &cfg); err != nil {
1343+
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err)
1344+
continue
1345+
}
1346+
}
1347+
if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil {
1348+
var offlineReason string
1349+
if !supported {
1350+
apiLevel := getRequiredApiLevel(cfg.Metadata)
1351+
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
1352+
s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel)
1353+
} else {
1354+
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
1355+
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr)
1356+
}
1357+
singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode()
1358+
if singleServerMode {
1359+
// Fake a stream, so we can respond to API requests as single-server.
1360+
mset := &stream{
1361+
acc: a,
1362+
jsa: jsa,
1363+
cfg: cfg.StreamConfig,
1364+
js: js,
1365+
srv: s,
1366+
stype: cfg.Storage,
1367+
consumers: make(map[string]*consumer),
1368+
active: false,
1369+
created: time.Now().UTC(),
1370+
offlineReason: offlineReason,
1371+
}
1372+
if !cfg.Created.IsZero() {
1373+
mset.created = cfg.Created
1374+
}
1375+
mset.closed.Store(true)
1376+
1377+
jsa.mu.Lock()
1378+
jsa.streams[cfg.Name] = mset
1379+
jsa.mu.Unlock()
1380+
1381+
// Now do the consumers.
1382+
odir := filepath.Join(sdir, fi.Name(), consumerDir)
1383+
consumers = append(consumers, &ce{mset, odir})
1384+
}
13381385
continue
13391386
}
13401387

@@ -1510,13 +1557,66 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
15101557
}
15111558

15121559
var cfg FileConsumerInfo
1513-
if err := json.Unmarshal(buf, &cfg); err != nil {
1514-
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err)
1560+
decoder := json.NewDecoder(bytes.NewReader(buf))
1561+
decoder.DisallowUnknownFields()
1562+
strictErr := decoder.Decode(&cfg)
1563+
if strictErr != nil {
1564+
cfg = FileConsumerInfo{}
1565+
if err := json.Unmarshal(buf, &cfg); err != nil {
1566+
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err)
1567+
continue
1568+
}
1569+
}
1570+
if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil {
1571+
var offlineReason string
1572+
if !supported {
1573+
apiLevel := getRequiredApiLevel(cfg.Metadata)
1574+
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
1575+
s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel)
1576+
} else {
1577+
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
1578+
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr)
1579+
}
1580+
singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode()
1581+
if singleServerMode {
1582+
if !e.mset.closed.Load() {
1583+
s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name())
1584+
e.mset.mu.Lock()
1585+
e.mset.offlineReason = "stopped"
1586+
e.mset.mu.Unlock()
1587+
e.mset.stop(false, false)
1588+
}
1589+
1590+
// Fake a consumer, so we can respond to API requests as single-server.
1591+
o := &consumer{
1592+
mset: e.mset,
1593+
js: s.getJetStream(),
1594+
acc: a,
1595+
srv: s,
1596+
cfg: cfg.ConsumerConfig,
1597+
active: false,
1598+
stream: e.mset.name(),
1599+
name: cfg.Name,
1600+
dseq: 1,
1601+
sseq: 1,
1602+
created: time.Now().UTC(),
1603+
closed: true,
1604+
offlineReason: offlineReason,
1605+
}
1606+
if !cfg.Created.IsZero() {
1607+
o.created = cfg.Created
1608+
}
1609+
1610+
e.mset.mu.Lock()
1611+
e.mset.setConsumer(o)
1612+
e.mset.mu.Unlock()
1613+
}
15151614
continue
15161615
}
1616+
15171617
isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
15181618
if isEphemeral {
1519-
// This is an ephermal consumer and this could fail on restart until
1619+
// This is an ephemeral consumer and this could fail on restart until
15201620
// the consumer can reconnect. We will create it as a durable and switch it.
15211621
cfg.ConsumerConfig.Durable = ofi.Name()
15221622
}

0 commit comments

Comments
 (0)