Skip to content

Commit 3e60436

Browse files
Filestore fixes (#7508)
This PR makes the following fixes in the filestore: * `indexCacheBuf` no longer tries to correct `fseq`, since the head hole-filling should have already identified the gap; * `writeMsgRecordLocked` now only replaces the cache `fseq` if writing the first index; * An off-by-one error in tail hole-filling that could mark one sequence as incorrectly deleted and altogether miss the last one has been fixed; * Skip messages are now written without releasing and re-acquiring the message block mutex. Signed-off-by: Neil Twigg <[email protected]>
2 parents 48a4477 + 310c38e commit 3e60436

File tree

1 file changed

+10
-16
lines changed

1 file changed

+10
-16
lines changed

server/filestore.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4537,11 +4537,11 @@ func (mb *msgBlock) skipMsg(seq uint64, now int64) {
45374537
needsRecord = true
45384538
mb.dmap.Insert(seq)
45394539
}
4540-
mb.mu.Unlock()
4541-
45424540
if needsRecord {
4543-
mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, now, true)
4544-
} else {
4541+
mb.writeMsgRecordLocked(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, now, true, true)
4542+
}
4543+
mb.mu.Unlock()
4544+
if !needsRecord {
45454545
mb.kickFlusher()
45464546
}
45474547
}
@@ -4629,10 +4629,9 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
46294629
mb.dmap.Insert(seq)
46304630
}
46314631
}
4632-
mb.mu.Unlock()
4633-
46344632
// Write out our placeholder.
4635-
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true)
4633+
mb.writeMsgRecordLocked(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, now, true, true)
4634+
mb.mu.Unlock()
46364635

46374636
// Now update FS accounting.
46384637
// Update fs state.
@@ -6399,11 +6398,10 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg
63996398
mb.updateAccounting(seq, ts, rl)
64006399
// Strip ebit if set.
64016400
seq = seq &^ ebit
6402-
if mb.cache.fseq == 0 {
6401+
// Write index
6402+
if mb.cache.idx = append(mb.cache.idx, uint32(index)|cbit); len(mb.cache.idx) == 1 {
64036403
mb.cache.fseq = seq
64046404
}
6405-
// Write index
6406-
mb.cache.idx = append(mb.cache.idx, uint32(index)|cbit)
64076405
} else {
64086406
// Make sure to account for tombstones in rbytes.
64096407
mb.rbytes += rl
@@ -7097,10 +7095,6 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
70977095
}
70987096
// Add to our index.
70997097
idx = append(idx, index)
7100-
// Adjust if we guessed wrong.
7101-
if seq != 0 && seq < fseq {
7102-
fseq = seq
7103-
}
71047098

71057099
// Make sure our dmap has this entry if it was erased.
71067100
if erased && dms == 0 && seq != 0 {
@@ -7143,8 +7137,8 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
71437137
// earlier loop if we've ran out of block file to look at, but should
71447138
// be easily noticed because the seq will be below the last seq from
71457139
// the index.
7146-
if seq > 0 && seq < mbLastSeq {
7147-
for dseq := seq; dseq < mbLastSeq; dseq++ {
7140+
if seq > 0 && seq+1 <= mbLastSeq {
7141+
for dseq := seq + 1; dseq <= mbLastSeq; dseq++ {
71487142
idx = append(idx, dbit)
71497143
if dms == 0 {
71507144
mb.dmap.Insert(dseq)

0 commit comments

Comments
 (0)