Skip to content

Commit 5f52fb0

Browse files
authored
cache/stage: add cooldown time for cache (#6435)
Signed-off-by: jiefenghuang <[email protected]>
1 parent 2062b8b commit 5f52fb0

File tree

2 files changed

+44
-20
lines changed

2 files changed

+44
-20
lines changed

pkg/chunk/disk_cache.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ type cacheStore struct {
9999

100100
state dcState
101101
stateLock sync.Mutex
102+
103+
// newBlockCooldown reduces the initial access time for newly cached staged blocks.
104+
// This helps prevent a surge of writes from evicting active read blocks.
105+
stagedBlockCooldown time.Duration
102106
}
103107

104108
func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize, maxItems int64, pendingPages int, config *Config, uploader func(key, path string, force bool) bool) *cacheStore {
@@ -115,22 +119,23 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize, maxItems int64
115119
keyIndex, _ = NewKeyIndex(config)
116120
}
117121
c := &cacheStore{
118-
m: m,
119-
dir: dir,
120-
mode: config.CacheMode,
121-
capacity: cacheSize,
122-
maxItems: maxItems,
123-
maxStageWrite: config.MaxStageWrite,
124-
freeRatio: config.FreeSpace,
125-
checksum: config.CacheChecksum,
126-
hashPrefix: config.HashPrefix,
127-
scanInterval: config.CacheScanInterval,
128-
cacheExpire: config.CacheExpire,
129-
keys: keyIndex,
130-
pending: make(chan pendingFile, pendingPages),
131-
pages: make(map[string]*Page),
132-
uploader: uploader,
133-
opTs: make(map[time.Duration]func() error),
122+
m: m,
123+
dir: dir,
124+
mode: config.CacheMode,
125+
capacity: cacheSize,
126+
maxItems: maxItems,
127+
maxStageWrite: config.MaxStageWrite,
128+
freeRatio: config.FreeSpace,
129+
checksum: config.CacheChecksum,
130+
hashPrefix: config.HashPrefix,
131+
scanInterval: config.CacheScanInterval,
132+
cacheExpire: config.CacheExpire,
133+
keys: keyIndex,
134+
pending: make(chan pendingFile, pendingPages),
135+
pages: make(map[string]*Page),
136+
uploader: uploader,
137+
opTs: make(map[time.Duration]func() error),
138+
stagedBlockCooldown: config.CacheExpire / 2,
134139
}
135140
c.stateLock = sync.Mutex{}
136141
if config.Writeback {
@@ -736,9 +741,6 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) {
736741
cache.used -= int64(iter.size + 4096)
737742
}
738743
iter.size = size
739-
if atime > iter.atime {
740-
iter.atime = atime
741-
}
742744
}
743745
cache.keys.add(k, *iter) // add or update
744746
if size > 0 {
@@ -769,7 +771,7 @@ func (cache *cacheStore) stage(key string, data []byte) (string, error) {
769771
path := cache.cachePath(key)
770772
cache.createDir(filepath.Dir(path))
771773
if err = os.Link(stagingPath, path); err == nil {
772-
cache.add(key, -int32(len(data)), uint32(time.Now().Unix()))
774+
cache.add(key, -int32(len(data)), uint32(time.Now().Add(-cache.stagedBlockCooldown).Unix()))
773775
} else {
774776
logger.Warnf("link %s to %s failed: %s", stagingPath, path, err)
775777
}

pkg/chunk/disk_cache_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,3 +591,25 @@ func TestLruEviction(t *testing.T) {
591591
require.Equal(t, 0, len(le.lruHeap), "LRU heap should be empty by cleanupFull after setting maxItems to 1")
592592
})
593593
}
594+
595+
func TestCooldownAtimeOnWriteFixedOnLoad(t *testing.T) {
596+
dir := t.TempDir()
597+
conf := defaultConf
598+
conf.CacheExpire = time.Hour
599+
m := new(cacheManagerMetrics)
600+
m.initMetrics()
601+
cache := newCacheStore(m, dir, 1<<30, 1000, 1, &conf, nil)
602+
key := "0_0_4"
603+
604+
now := time.Now()
605+
path, err := cache.stage(key, []byte("test"))
606+
require.NoError(t, err)
607+
require.NotEmpty(t, path)
608+
require.LessOrEqual(t, cache.keys.peekAtime(cache.getCacheKey(key)), uint32(now.Add(-conf.CacheExpire/2).Unix())) // should have bias in atime
609+
610+
rc, err := cache.load(key)
611+
require.NoError(t, err)
612+
require.NotNil(t, rc)
613+
defer rc.Close()
614+
require.GreaterOrEqual(t, cache.keys.peekAtime(cache.getCacheKey(key)), uint32(now.Unix())) // bias should have been fixed on load
615+
}

0 commit comments

Comments
 (0)