Skip to content

Commit 192d2bf

Browse files
[IMPROVED] Filestore MaxBytes/Msgs update performance (#7455)
2 parents e3b6a75 + 2064735 commit 192d2bf

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

server/filestore.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4756,6 +4756,17 @@ func (fs *fileStore) enforceMsgLimit() {
47564756
return
47574757
}
47584758
for nmsgs := fs.state.Msgs; nmsgs > uint64(fs.cfg.MaxMsgs); nmsgs = fs.state.Msgs {
4759+
// If the first block can be removed fully, purge it entirely without needing to walk sequences.
4760+
if len(fs.blks) > 0 {
4761+
fmb := fs.blks[0]
4762+
fmb.mu.RLock()
4763+
msgs := fmb.msgs
4764+
fmb.mu.RUnlock()
4765+
if nmsgs-msgs > uint64(fs.cfg.MaxMsgs) {
4766+
fs.purgeMsgBlock(fmb)
4767+
continue
4768+
}
4769+
}
47594770
if removed, err := fs.deleteFirstMsg(); err != nil || !removed {
47604771
fs.rebuildFirst()
47614772
return
@@ -4773,6 +4784,17 @@ func (fs *fileStore) enforceBytesLimit() {
47734784
return
47744785
}
47754786
for bs := fs.state.Bytes; bs > uint64(fs.cfg.MaxBytes); bs = fs.state.Bytes {
4787+
// If the first block can be removed fully, purge it entirely without needing to walk sequences.
4788+
if len(fs.blks) > 0 {
4789+
fmb := fs.blks[0]
4790+
fmb.mu.RLock()
4791+
bytes := fmb.bytes
4792+
fmb.mu.RUnlock()
4793+
if bs-bytes > uint64(fs.cfg.MaxBytes) {
4794+
fs.purgeMsgBlock(fmb)
4795+
continue
4796+
}
4797+
}
47764798
if removed, err := fs.deleteFirstMsg(); err != nil || !removed {
47774799
fs.rebuildFirst()
47784800
return
@@ -9347,6 +9369,25 @@ func (fs *fileStore) forceRemoveMsgBlock(mb *msgBlock) {
93479369
fs.removeMsgBlockFromList(mb)
93489370
}
93499371

9372+
// Purges and removes the msgBlock from the store.
9373+
// Lock should be held.
9374+
func (fs *fileStore) purgeMsgBlock(mb *msgBlock) {
9375+
mb.mu.Lock()
9376+
// Update top level accounting.
9377+
msgs, bytes := mb.msgs, mb.bytes
9378+
if msgs > fs.state.Msgs {
9379+
msgs = fs.state.Msgs
9380+
}
9381+
if bytes > fs.state.Bytes {
9382+
bytes = fs.state.Bytes
9383+
}
9384+
fs.state.Msgs -= msgs
9385+
fs.state.Bytes -= bytes
9386+
fs.removeMsgBlock(mb)
9387+
mb.mu.Unlock()
9388+
fs.selectNextFirst()
9389+
}
9390+
93509391
// Called by purge to simply get rid of the cache and close our fds.
93519392
// Lock should not be held.
93529393
func (mb *msgBlock) dirtyClose() {

server/filestore_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10963,3 +10963,44 @@ func TestFileStoreEraseMsgErr(t *testing.T) {
1096310963
fs.EraseMsg(2)
1096410964
})
1096510965
}
10966+
10967+
func TestFileStorePurgeMsgBlock(t *testing.T) {
10968+
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
10969+
fcfg.BlockSize = 10 * 33
10970+
cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}
10971+
created := time.Now()
10972+
fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
10973+
require_NoError(t, err)
10974+
defer fs.Stop()
10975+
10976+
for range 20 {
10977+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
10978+
require_NoError(t, err)
10979+
}
10980+
10981+
fs.mu.RLock()
10982+
blks := len(fs.blks)
10983+
fs.mu.RUnlock()
10984+
require_Equal(t, blks, 2)
10985+
10986+
state := fs.State()
10987+
require_Equal(t, state.FirstSeq, 1)
10988+
require_Equal(t, state.LastSeq, 20)
10989+
require_Equal(t, state.Msgs, 20)
10990+
require_Equal(t, state.Bytes, 20*33)
10991+
10992+
// Purging the block should both remove the block and do the accounting.
10993+
fmb := fs.getFirstBlock()
10994+
fs.mu.Lock()
10995+
fs.purgeMsgBlock(fmb)
10996+
blks = len(fs.blks)
10997+
fs.mu.Unlock()
10998+
10999+
require_Equal(t, blks, 1)
11000+
state = fs.State()
11001+
require_Equal(t, state.FirstSeq, 11)
11002+
require_Equal(t, state.LastSeq, 20)
11003+
require_Equal(t, state.Msgs, 10)
11004+
require_Equal(t, state.Bytes, 10*33)
11005+
})
11006+
}

0 commit comments

Comments
 (0)