Skip to content

Commit 7ad5a05

Browse files
committed
cmd/mount: add max-downloads to limit concurrent get for object storage
Signed-off-by: jiefenghuang <[email protected]>
1 parent 0418191 commit 7ad5a05

File tree

3 files changed

+101
-66
lines changed

3 files changed

+101
-66
lines changed

cmd/flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ func storageFlags() []cli.Flag {
128128
Value: 20,
129129
Usage: "number of connections to upload",
130130
},
131+
&cli.IntFlag{
132+
Name: "max-downloads",
133+
Value: 200,
134+
Usage: "number of connections to download",
135+
},
131136
&cli.IntFlag{
132137
Name: "max-stage-write",
133138
Value: 1000, // large enough for normal cases, also prevents unlimited concurrency in abnormal cases

cmd/mount.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
367367
GetTimeout: utils.Duration(c.String("get-timeout")),
368368
PutTimeout: utils.Duration(c.String("put-timeout")),
369369
MaxUpload: c.Int("max-uploads"),
370+
MaxDownload: c.Int("max-downloads"),
370371
MaxStageWrite: c.Int("max-stage-write"),
371372
MaxRetries: c.Int("io-retries"),
372373
Writeback: c.Bool("writeback"),

pkg/chunk/cached_store.go

Lines changed: 95 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -152,46 +152,9 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
152152

153153
if s.store.seekable &&
154154
(!s.store.conf.CacheEnabled() || (boff > 0 && len(p) <= blockSize/4)) {
155-
if s.store.downLimit != nil {
156-
s.store.downLimit.Wait(int64(len(p)))
157-
}
158-
fullPage, err := s.store.group.TryPiggyback(key)
159-
if fullPage != nil {
160-
defer fullPage.Release()
161-
if err == nil { // piggybacked a full read
162-
n = copy(p, fullPage.Data[boff:])
163-
return n, nil
164-
}
165-
}
166-
// partial read
167-
st := time.Now()
168-
var (
169-
reqID string
170-
sc = object.DefaultStorageClass
171-
)
172-
page.Acquire()
173-
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
174-
defer page.Release()
175-
in, err := s.store.storage.Get(cCtx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
176-
if err == nil {
177-
n, err = io.ReadFull(in, p)
178-
_ = in.Close()
179-
}
180-
return err
181-
}, s.store.conf.GetTimeout)
182-
used := time.Since(st)
183-
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", boff, len(p)), reqID, err, used)
184-
if errors.Is(err, context.Canceled) {
185-
return 0, err
186-
}
187-
s.store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
188-
s.store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
189-
if err == nil {
190-
s.store.fetcher.fetch(key)
191-
return n, nil
192-
} else {
193-
s.store.objectReqErrors.Add(1)
194-
// fall back to full read
155+
n, err = s.store.loadRange(ctx, key, page, boff)
156+
if err == nil || !errors.Is(err, errTryFullRead) {
157+
return n, err
195158
}
196159
}
197160

@@ -476,7 +439,7 @@ func (s *wSlice) upload(indx int) {
476439
s.errors <- nil
477440
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
478441
select {
479-
case s.store.currentUpload <- true:
442+
case s.store.currentUpload <- struct{}{}:
480443
defer func() { <-s.store.currentUpload }()
481444
if err = s.store.upload(key, block, nil); err == nil {
482445
s.store.bcache.uploaded(key, blen)
@@ -495,7 +458,7 @@ func (s *wSlice) upload(indx int) {
495458
return
496459
}
497460
}
498-
s.store.currentUpload <- true
461+
s.store.currentUpload <- struct{}{}
499462
defer func() { <-s.store.currentUpload }()
500463
s.errors <- s.store.upload(key, block, s)
501464
}()
@@ -572,6 +535,7 @@ type Config struct {
572535
AutoCreate bool
573536
Compress string
574537
MaxUpload int
538+
MaxDownload int
575539
MaxStageWrite int
576540
MaxRetries int
577541
UploadLimit int64 // bytes per second
@@ -604,6 +568,16 @@ func (c *Config) SelfCheck(uuid string) {
604568
logger.Warnf("max-uploads should be greater than 0, set it to 1")
605569
c.MaxUpload = 1
606570
}
571+
if c.UploadLimit > 0 && int64(c.MaxUpload*c.BlockSize) > c.UploadLimit*int64(c.GetTimeout/time.Second)/2 {
572+
logger.Warnf("max-upload %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxUpload, c.UploadDelay*8>>20)
573+
}
574+
if c.MaxDownload <= 0 {
575+
logger.Warnf("max-downloads should be greater than 0, set it to 200")
576+
c.MaxDownload = 200
577+
}
578+
if c.DownloadLimit > 0 && int64(c.MaxDownload*c.BlockSize) > c.DownloadLimit*int64(c.GetTimeout/time.Second)/2 {
579+
logger.Warnf("max-download %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxDownload, (c.DownloadLimit*8)>>20)
580+
}
607581
if c.BufferSize <= 32<<20 {
608582
logger.Warnf("buffer-size is too small, setting it to 32 MiB")
609583
c.BufferSize = 32 << 20
@@ -686,21 +660,22 @@ func (c *Config) CacheEnabled() bool {
686660
}
687661

688662
type cachedStore struct {
689-
storage object.ObjectStorage
690-
bcache CacheManager
691-
fetcher *prefetcher
692-
conf Config
693-
group *Controller
694-
currentUpload chan bool
695-
pendingCh chan *pendingItem
696-
pendingKeys map[string]*pendingItem
697-
pendingMutex sync.Mutex
698-
startHour int
699-
endHour int
700-
compressor compress.Compressor
701-
seekable bool
702-
upLimit *ratelimit.Bucket
703-
downLimit *ratelimit.Bucket
663+
storage object.ObjectStorage
664+
bcache CacheManager
665+
fetcher *prefetcher
666+
conf Config
667+
group *Controller
668+
currentUpload chan struct{}
669+
currentDownload chan struct{}
670+
pendingCh chan *pendingItem
671+
pendingKeys map[string]*pendingItem
672+
pendingMutex sync.Mutex
673+
startHour int
674+
endHour int
675+
compressor compress.Compressor
676+
seekable bool
677+
upLimit *ratelimit.Bucket
678+
downLimit *ratelimit.Bucket
704679

705680
cacheHits prometheus.Counter
706681
cacheMiss prometheus.Counter
@@ -722,13 +697,66 @@ func logRequest(typeStr, key, param, reqID string, err error, used time.Duration
722697
}
723698
}
724699

700+
var errTryFullRead = errors.New("try full read")
701+
702+
func (store *cachedStore) loadRange(ctx context.Context, key string, page *Page, off int) (n int, err error) {
703+
p := page.Data
704+
fullPage, err := store.group.TryPiggyback(key)
705+
if fullPage != nil {
706+
defer fullPage.Release()
707+
if err == nil { // piggybacked a full read
708+
n = copy(p, fullPage.Data[off:])
709+
return n, nil
710+
}
711+
}
712+
713+
store.currentDownload <- struct{}{}
714+
defer func() { <-store.currentDownload }()
715+
if store.downLimit != nil {
716+
store.downLimit.Wait(int64(len(p)))
717+
}
718+
719+
start := time.Now()
720+
var (
721+
reqID string
722+
sc = object.DefaultStorageClass
723+
)
724+
page.Acquire()
725+
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
726+
defer page.Release()
727+
in, err := store.storage.Get(cCtx, key, int64(off), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
728+
if err == nil {
729+
n, err = io.ReadFull(in, p)
730+
_ = in.Close()
731+
}
732+
return err
733+
}, store.conf.GetTimeout)
734+
735+
used := time.Since(start)
736+
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", off, len(p)), reqID, err, used)
737+
if errors.Is(err, context.Canceled) {
738+
return 0, err
739+
}
740+
store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
741+
store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
742+
if err == nil {
743+
store.fetcher.fetch(key)
744+
return n, nil
745+
}
746+
store.objectReqErrors.Add(1)
747+
// fall back to full read
748+
return 0, errTryFullRead
749+
}
750+
725751
func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) {
726752
defer func() {
727753
e := recover()
728754
if e != nil {
729755
err = fmt.Errorf("recovered from %s", e)
730756
}
731757
}()
758+
store.currentDownload <- struct{}{}
759+
defer func() { <-store.currentDownload }()
732760
needed := store.compressor.CompressBound(len(page.Data))
733761
compressed := needed > len(page.Data)
734762
// we don't know the actual size for compressed block
@@ -806,14 +834,15 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
806834
config.PutTimeout = time.Second * 60
807835
}
808836
store := &cachedStore{
809-
storage: storage,
810-
conf: config,
811-
currentUpload: make(chan bool, config.MaxUpload),
812-
compressor: compressor,
813-
seekable: compressor.CompressBound(0) == 0,
814-
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
815-
pendingKeys: make(map[string]*pendingItem),
816-
group: NewController(),
837+
storage: storage,
838+
conf: config,
839+
currentUpload: make(chan struct{}, config.MaxUpload),
840+
currentDownload: make(chan struct{}, config.MaxDownload),
841+
compressor: compressor,
842+
seekable: compressor.CompressBound(0) == 0,
843+
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
844+
pendingKeys: make(map[string]*pendingItem),
845+
group: NewController(),
817846
}
818847
if config.UploadLimit > 0 {
819848
// there are overheads coming from HTTP/TCP/IP
@@ -993,7 +1022,7 @@ func parseObjOrigSize(key string) int {
9931022
}
9941023

9951024
func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
996-
store.currentUpload <- true
1025+
store.currentUpload <- struct{}{}
9971026
defer func() {
9981027
<-store.currentUpload
9991028
}()

0 commit comments

Comments
 (0)