Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func storageFlags() []cli.Flag {
Value: 20,
Usage: "number of connections to upload",
},
&cli.IntFlag{
Name: "max-downloads",
Value: 200,
Usage: "number of connections to download",
},
&cli.IntFlag{
Name: "max-stage-write",
Value: 1000, // large enough for normal cases, also prevents unlimited concurrency in abnormal cases
Expand Down
1 change: 1 addition & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
GetTimeout: utils.Duration(c.String("get-timeout")),
PutTimeout: utils.Duration(c.String("put-timeout")),
MaxUpload: c.Int("max-uploads"),
MaxDownload: c.Int("max-downloads"),
MaxStageWrite: c.Int("max-stage-write"),
MaxRetries: c.Int("io-retries"),
Writeback: c.Bool("writeback"),
Expand Down
161 changes: 95 additions & 66 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,46 +152,9 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er

if s.store.seekable &&
(!s.store.conf.CacheEnabled() || (boff > 0 && len(p) <= blockSize/4)) {
if s.store.downLimit != nil {
s.store.downLimit.Wait(int64(len(p)))
}
fullPage, err := s.store.group.TryPiggyback(key)
if fullPage != nil {
defer fullPage.Release()
if err == nil { // piggybacked a full read
n = copy(p, fullPage.Data[boff:])
return n, nil
}
}
// partial read
st := time.Now()
var (
reqID string
sc = object.DefaultStorageClass
)
page.Acquire()
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer page.Release()
in, err := s.store.storage.Get(cCtx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p)
_ = in.Close()
}
return err
}, s.store.conf.GetTimeout)
used := time.Since(st)
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", boff, len(p)), reqID, err, used)
if errors.Is(err, context.Canceled) {
return 0, err
}
s.store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
s.store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
if err == nil {
s.store.fetcher.fetch(key)
return n, nil
} else {
s.store.objectReqErrors.Add(1)
// fall back to full read
n, err = s.store.loadRange(ctx, key, page, boff)
if err == nil || !errors.Is(err, errTryFullRead) {
return n, err
}
}

Expand Down Expand Up @@ -476,7 +439,7 @@ func (s *wSlice) upload(indx int) {
s.errors <- nil
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
select {
case s.store.currentUpload <- true:
case s.store.currentUpload <- struct{}{}:
defer func() { <-s.store.currentUpload }()
if err = s.store.upload(key, block, nil); err == nil {
s.store.bcache.uploaded(key, blen)
Expand All @@ -495,7 +458,7 @@ func (s *wSlice) upload(indx int) {
return
}
}
s.store.currentUpload <- true
s.store.currentUpload <- struct{}{}
defer func() { <-s.store.currentUpload }()
s.errors <- s.store.upload(key, block, s)
}()
Expand Down Expand Up @@ -572,6 +535,7 @@ type Config struct {
AutoCreate bool
Compress string
MaxUpload int
MaxDownload int
MaxStageWrite int
MaxRetries int
UploadLimit int64 // bytes per second
Expand Down Expand Up @@ -604,6 +568,16 @@ func (c *Config) SelfCheck(uuid string) {
logger.Warnf("max-uploads should be greater than 0, set it to 1")
c.MaxUpload = 1
}
if c.UploadLimit > 0 && int64(c.MaxUpload*c.BlockSize) > c.UploadLimit*int64(c.GetTimeout/time.Second)/2 {
logger.Warnf("max-upload %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxUpload, c.UploadDelay*8>>20)
}
if c.MaxDownload <= 0 {
logger.Warnf("max-downloads should be greater than 0, set it to 200")
c.MaxDownload = 200
}
if c.DownloadLimit > 0 && int64(c.MaxDownload*c.BlockSize) > c.DownloadLimit*int64(c.GetTimeout/time.Second)/2 {
logger.Warnf("max-download %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxDownload, (c.DownloadLimit*8)>>20)
}
if c.BufferSize <= 32<<20 {
logger.Warnf("buffer-size is too small, setting it to 32 MiB")
c.BufferSize = 32 << 20
Expand Down Expand Up @@ -686,21 +660,22 @@ func (c *Config) CacheEnabled() bool {
}

type cachedStore struct {
storage object.ObjectStorage
bcache CacheManager
fetcher *prefetcher
conf Config
group *Controller
currentUpload chan bool
pendingCh chan *pendingItem
pendingKeys map[string]*pendingItem
pendingMutex sync.Mutex
startHour int
endHour int
compressor compress.Compressor
seekable bool
upLimit *ratelimit.Bucket
downLimit *ratelimit.Bucket
storage object.ObjectStorage
bcache CacheManager
fetcher *prefetcher
conf Config
group *Controller
currentUpload chan struct{}
currentDownload chan struct{}
pendingCh chan *pendingItem
pendingKeys map[string]*pendingItem
pendingMutex sync.Mutex
startHour int
endHour int
compressor compress.Compressor
seekable bool
upLimit *ratelimit.Bucket
downLimit *ratelimit.Bucket

cacheHits prometheus.Counter
cacheMiss prometheus.Counter
Expand All @@ -722,13 +697,66 @@ func logRequest(typeStr, key, param, reqID string, err error, used time.Duration
}
}

var errTryFullRead = errors.New("try full read")

func (store *cachedStore) loadRange(ctx context.Context, key string, page *Page, off int) (n int, err error) {
p := page.Data
fullPage, err := store.group.TryPiggyback(key)
if fullPage != nil {
defer fullPage.Release()
if err == nil { // piggybacked a full read
n = copy(p, fullPage.Data[off:])
return n, nil
}
}

store.currentDownload <- struct{}{}
defer func() { <-store.currentDownload }()
if store.downLimit != nil {
store.downLimit.Wait(int64(len(p)))
}

start := time.Now()
var (
reqID string
sc = object.DefaultStorageClass
)
page.Acquire()
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer page.Release()
in, err := store.storage.Get(cCtx, key, int64(off), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p)
_ = in.Close()
}
return err
}, store.conf.GetTimeout)

used := time.Since(start)
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", off, len(p)), reqID, err, used)
if errors.Is(err, context.Canceled) {
return 0, err
}
store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
if err == nil {
store.fetcher.fetch(key)
return n, nil
}
store.objectReqErrors.Add(1)
// fall back to full read
return 0, errTryFullRead
}

func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) {
defer func() {
e := recover()
if e != nil {
err = fmt.Errorf("recovered from %s", e)
}
}()
store.currentDownload <- struct{}{}
defer func() { <-store.currentDownload }()
needed := store.compressor.CompressBound(len(page.Data))
compressed := needed > len(page.Data)
// we don't know the actual size for compressed block
Expand Down Expand Up @@ -806,14 +834,15 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
config.PutTimeout = time.Second * 60
}
store := &cachedStore{
storage: storage,
conf: config,
currentUpload: make(chan bool, config.MaxUpload),
compressor: compressor,
seekable: compressor.CompressBound(0) == 0,
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
pendingKeys: make(map[string]*pendingItem),
group: NewController(),
storage: storage,
conf: config,
currentUpload: make(chan struct{}, config.MaxUpload),
currentDownload: make(chan struct{}, config.MaxDownload),
compressor: compressor,
seekable: compressor.CompressBound(0) == 0,
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
pendingKeys: make(map[string]*pendingItem),
group: NewController(),
}
if config.UploadLimit > 0 {
// there are overheads coming from HTTP/TCP/IP
Expand Down Expand Up @@ -993,7 +1022,7 @@ func parseObjOrigSize(key string) int {
}

func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
store.currentUpload <- true
store.currentUpload <- struct{}{}
defer func() {
<-store.currentUpload
}()
Expand Down
Loading