Skip to content

Commit fa97100

Browse files
authored
cmd/mount: add max-writeback-size option to filter blocks for writeback (#6447)
Signed-off-by: jiefenghuang <[email protected]>
1 parent a7c4bb8 commit fa97100

File tree

4 files changed

+70
-58
lines changed

4 files changed

+70
-58
lines changed

cmd/flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ func dataCacheFlags() []cli.Flag {
196196
Name: "writeback",
197197
Usage: "upload blocks in background",
198198
},
199+
&cli.StringFlag{
200+
Name: "writeback-threshold-size",
201+
Value: "0",
202+
Usage: "blocks smaller than this size will be staged, 0 means all staged.",
203+
},
199204
&cli.StringFlag{
200205
Name: "upload-delay",
201206
Value: "0s",

cmd/mount.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -364,18 +364,19 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
364364
Compress: format.Compression,
365365
HashPrefix: format.HashPrefix,
366366

367-
GetTimeout: utils.Duration(c.String("get-timeout")),
368-
PutTimeout: utils.Duration(c.String("put-timeout")),
369-
MaxUpload: c.Int("max-uploads"),
370-
MaxStageWrite: c.Int("max-stage-write"),
371-
MaxRetries: c.Int("io-retries"),
372-
Writeback: c.Bool("writeback"),
373-
Prefetch: c.Int("prefetch"),
374-
BufferSize: utils.ParseBytes(c, "buffer-size", 'M'),
375-
UploadLimit: utils.ParseMbps(c, "upload-limit") * 1e6 / 8,
376-
DownloadLimit: utils.ParseMbps(c, "download-limit") * 1e6 / 8,
377-
UploadDelay: utils.Duration(c.String("upload-delay")),
378-
UploadHours: c.String("upload-hours"),
367+
GetTimeout: utils.Duration(c.String("get-timeout")),
368+
PutTimeout: utils.Duration(c.String("put-timeout")),
369+
MaxUpload: c.Int("max-uploads"),
370+
MaxStageWrite: c.Int("max-stage-write"),
371+
MaxRetries: c.Int("io-retries"),
372+
Writeback: c.Bool("writeback"),
373+
WritebackThresholdSize: int(utils.ParseBytes(c, "writeback-threshold-size", 'B')),
374+
Prefetch: c.Int("prefetch"),
375+
BufferSize: utils.ParseBytes(c, "buffer-size", 'M'),
376+
UploadLimit: utils.ParseMbps(c, "upload-limit") * 1e6 / 8,
377+
DownloadLimit: utils.ParseMbps(c, "download-limit") * 1e6 / 8,
378+
UploadDelay: utils.Duration(c.String("upload-delay")),
379+
UploadHours: c.String("upload-hours"),
379380

380381
CacheDir: c.String("cache-dir"),
381382
CacheSize: utils.ParseBytes(c, "cache-size", 'M'),

pkg/chunk/cached_store.go

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ func (s *wSlice) upload(indx int) {
454454
if off != blen {
455455
panic(fmt.Sprintf("block length does not match: %v != %v", off, blen))
456456
}
457-
if s.writeback {
457+
if s.writeback && blen < s.store.conf.WritebackThresholdSize {
458458
stagingPath := "unknown"
459459
stageFailed := false
460460
block.Acquire()
@@ -559,35 +559,36 @@ func (s *wSlice) Abort() {
559559

560560
// Config contains options for cachedStore
561561
type Config struct {
562-
CacheDir string
563-
CacheMode os.FileMode
564-
CacheSize uint64
565-
CacheItems int64
566-
CacheChecksum string
567-
CacheEviction string
568-
CacheScanInterval time.Duration
569-
CacheExpire time.Duration
570-
OSCache bool
571-
FreeSpace float32
572-
AutoCreate bool
573-
Compress string
574-
MaxUpload int
575-
MaxStageWrite int
576-
MaxRetries int
577-
UploadLimit int64 // bytes per second
578-
DownloadLimit int64 // bytes per second
579-
Writeback bool
580-
UploadDelay time.Duration
581-
UploadHours string
582-
HashPrefix bool
583-
BlockSize int
584-
GetTimeout time.Duration
585-
PutTimeout time.Duration
586-
CacheFullBlock bool
587-
CacheLargeWrite bool
588-
BufferSize uint64
589-
Readahead int
590-
Prefetch int
562+
CacheDir string
563+
CacheMode os.FileMode
564+
CacheSize uint64
565+
CacheItems int64
566+
CacheChecksum string
567+
CacheEviction string
568+
CacheScanInterval time.Duration
569+
CacheExpire time.Duration
570+
OSCache bool
571+
FreeSpace float32
572+
AutoCreate bool
573+
Compress string
574+
MaxUpload int
575+
MaxStageWrite int
576+
MaxRetries int
577+
UploadLimit int64 // bytes per second
578+
DownloadLimit int64 // bytes per second
579+
Writeback bool
580+
WritebackThresholdSize int
581+
UploadDelay time.Duration
582+
UploadHours string
583+
HashPrefix bool
584+
BlockSize int
585+
GetTimeout time.Duration
586+
PutTimeout time.Duration
587+
CacheFullBlock bool
588+
CacheLargeWrite bool
589+
BufferSize uint64
590+
Readahead int
591+
Prefetch int
591592
}
592593

593594
func (c *Config) SelfCheck(uuid string) {
@@ -599,20 +600,6 @@ func (c *Config) SelfCheck(uuid string) {
599600
}
600601
c.CacheDir = "memory"
601602
}
602-
if !c.Writeback {
603-
if c.UploadDelay > 0 || c.UploadHours != "" {
604-
logger.Warnf("delayed upload is disabled in non-writeback mode")
605-
c.UploadDelay = 0
606-
c.UploadHours = ""
607-
}
608-
}
609-
if !c.CacheFullBlock && c.Writeback {
610-
logger.Warnf("cache-partial-only is ineffective for stage blocks with writeback enabled")
611-
}
612-
if _, _, err := c.parseHours(); err != nil {
613-
logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err)
614-
c.UploadHours = ""
615-
}
616603
if c.MaxUpload <= 0 {
617604
logger.Warnf("max-uploads should be greater than 0, set it to 1")
618605
c.MaxUpload = 1
@@ -635,6 +622,24 @@ func (c *Config) SelfCheck(uuid string) {
635622
logger.Warnf("writeback is not supported in memory cache mode")
636623
c.Writeback = false
637624
}
625+
if c.Writeback {
626+
if !c.CacheFullBlock {
627+
logger.Warnf("cache-partial-only is ineffective for stage blocks with writeback enabled")
628+
}
629+
if c.WritebackThresholdSize == 0 {
630+
c.WritebackThresholdSize = c.BlockSize + 1
631+
}
632+
} else {
633+
if c.UploadDelay > 0 || c.UploadHours != "" {
634+
logger.Warnf("delayed upload is disabled in non-writeback mode")
635+
c.UploadDelay = 0
636+
c.UploadHours = ""
637+
}
638+
}
639+
if _, _, err := c.parseHours(); err != nil {
640+
logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err)
641+
c.UploadHours = ""
642+
}
638643
if c.CacheEviction == "" {
639644
c.CacheEviction = Eviction2Random
640645
} else if c.CacheEviction != Eviction2Random && c.CacheEviction != EvictionNone && c.CacheEviction != EvictionLRU {
@@ -818,7 +823,7 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
818823
store.downLimit = ratelimit.NewBucketWithRate(float64(config.DownloadLimit)*0.85, config.DownloadLimit/10)
819824
}
820825
store.initMetrics()
821-
if store.conf.CacheDir != "memory" && store.conf.Writeback {
826+
if store.conf.Writeback {
822827
store.startHour, store.endHour, _ = config.parseHours()
823828
if store.startHour != store.endHour {
824829
logger.Infof("background upload at %d:00 ~ %d:00", store.startHour, store.endHour)
@@ -866,7 +871,7 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
866871
}
867872
})
868873

869-
if store.conf.CacheDir != "memory" && store.conf.Writeback {
874+
if store.conf.Writeback {
870875
for i := 0; i < store.conf.MaxUpload; i++ {
871876
go store.uploader()
872877
}

pkg/chunk/cached_store_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ func TestForceUpload(t *testing.T) {
204204
config := defaultConf
205205
_ = os.RemoveAll(config.CacheDir)
206206
config.Writeback = true
207+
config.WritebackThresholdSize = config.BlockSize + 1
207208
config.UploadDelay = time.Hour
208209
config.BlockSize = 4 << 20
209210
store := NewCachedStore(blob, config, nil)

0 commit comments

Comments
 (0)