Skip to content

Commit 2ad625f

Browse files
authored
cmd/mount: add max-downloads option to limit concurrent get for object storage (#6472)
Signed-off-by: jiefenghuang <[email protected]>
1 parent 0418191 commit 2ad625f

File tree

12 files changed

+148
-104
lines changed

12 files changed

+148
-104
lines changed

cmd/flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ func storageFlags() []cli.Flag {
128128
Value: 20,
129129
Usage: "number of connections to upload",
130130
},
131+
&cli.IntFlag{
132+
Name: "max-downloads",
133+
Value: 200,
134+
Usage: "number of connections to download",
135+
},
131136
&cli.IntFlag{
132137
Name: "max-stage-write",
133138
Value: 1000, // large enough for normal cases, also prevents unlimited concurrency in abnormal cases

cmd/mount.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
367367
GetTimeout: utils.Duration(c.String("get-timeout")),
368368
PutTimeout: utils.Duration(c.String("put-timeout")),
369369
MaxUpload: c.Int("max-uploads"),
370+
MaxDownload: c.Int("max-downloads"),
370371
MaxStageWrite: c.Int("max-stage-write"),
371372
MaxRetries: c.Int("io-retries"),
372373
Writeback: c.Bool("writeback"),

cmd/object.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -395,14 +395,15 @@ func (j *juiceFS) Readlink(name string) (string, error) {
395395

396396
func getDefaultChunkConf(format *meta.Format) *chunk.Config {
397397
chunkConf := &chunk.Config{
398-
BlockSize: format.BlockSize * 1024,
399-
Compress: format.Compression,
400-
HashPrefix: format.HashPrefix,
401-
GetTimeout: time.Minute,
402-
PutTimeout: time.Minute,
403-
MaxUpload: 50,
404-
MaxRetries: 10,
405-
BufferSize: 300 << 20,
398+
BlockSize: format.BlockSize * 1024,
399+
Compress: format.Compression,
400+
HashPrefix: format.HashPrefix,
401+
GetTimeout: time.Minute,
402+
PutTimeout: time.Minute,
403+
MaxUpload: 50,
404+
MaxDownload: 200,
405+
MaxRetries: 10,
406+
BufferSize: 300 << 20,
406407
}
407408
chunkConf.SelfCheck(format.UUID)
408409
return chunkConf

cmd/object_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,10 @@ func TestJFS(t *testing.T) {
178178
var conf = vfs.Config{
179179
Meta: meta.DefaultConf(),
180180
Chunk: &chunk.Config{
181-
BlockSize: format.BlockSize << 10,
182-
MaxUpload: 1,
183-
BufferSize: 100 << 20,
181+
BlockSize: format.BlockSize << 10,
182+
MaxUpload: 1,
183+
MaxDownload: 200,
184+
BufferSize: 100 << 20,
184185
},
185186
DirEntryTimeout: time.Millisecond * 100,
186187
EntryTimeout: time.Millisecond * 100,

pkg/chunk/cached_store.go

Lines changed: 95 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -152,46 +152,9 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
152152

153153
if s.store.seekable &&
154154
(!s.store.conf.CacheEnabled() || (boff > 0 && len(p) <= blockSize/4)) {
155-
if s.store.downLimit != nil {
156-
s.store.downLimit.Wait(int64(len(p)))
157-
}
158-
fullPage, err := s.store.group.TryPiggyback(key)
159-
if fullPage != nil {
160-
defer fullPage.Release()
161-
if err == nil { // piggybacked a full read
162-
n = copy(p, fullPage.Data[boff:])
163-
return n, nil
164-
}
165-
}
166-
// partial read
167-
st := time.Now()
168-
var (
169-
reqID string
170-
sc = object.DefaultStorageClass
171-
)
172-
page.Acquire()
173-
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
174-
defer page.Release()
175-
in, err := s.store.storage.Get(cCtx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
176-
if err == nil {
177-
n, err = io.ReadFull(in, p)
178-
_ = in.Close()
179-
}
180-
return err
181-
}, s.store.conf.GetTimeout)
182-
used := time.Since(st)
183-
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", boff, len(p)), reqID, err, used)
184-
if errors.Is(err, context.Canceled) {
185-
return 0, err
186-
}
187-
s.store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
188-
s.store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
189-
if err == nil {
190-
s.store.fetcher.fetch(key)
191-
return n, nil
192-
} else {
193-
s.store.objectReqErrors.Add(1)
194-
// fall back to full read
155+
n, err = s.store.loadRange(ctx, key, page, boff)
156+
if err == nil || !errors.Is(err, errTryFullRead) {
157+
return n, err
195158
}
196159
}
197160

@@ -476,7 +439,7 @@ func (s *wSlice) upload(indx int) {
476439
s.errors <- nil
477440
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
478441
select {
479-
case s.store.currentUpload <- true:
442+
case s.store.currentUpload <- struct{}{}:
480443
defer func() { <-s.store.currentUpload }()
481444
if err = s.store.upload(key, block, nil); err == nil {
482445
s.store.bcache.uploaded(key, blen)
@@ -495,7 +458,7 @@ func (s *wSlice) upload(indx int) {
495458
return
496459
}
497460
}
498-
s.store.currentUpload <- true
461+
s.store.currentUpload <- struct{}{}
499462
defer func() { <-s.store.currentUpload }()
500463
s.errors <- s.store.upload(key, block, s)
501464
}()
@@ -572,6 +535,7 @@ type Config struct {
572535
AutoCreate bool
573536
Compress string
574537
MaxUpload int
538+
MaxDownload int
575539
MaxStageWrite int
576540
MaxRetries int
577541
UploadLimit int64 // bytes per second
@@ -604,6 +568,16 @@ func (c *Config) SelfCheck(uuid string) {
604568
logger.Warnf("max-uploads should be greater than 0, set it to 1")
605569
c.MaxUpload = 1
606570
}
571+
if c.UploadLimit > 0 && int64(c.MaxUpload*c.BlockSize) > c.UploadLimit*int64(c.GetTimeout/time.Second)/2 {
572+
logger.Warnf("max-upload %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxUpload, c.UploadDelay*8>>20)
573+
}
574+
if c.MaxDownload <= 0 {
575+
logger.Warnf("max-downloads should be greater than 0, set it to 200")
576+
c.MaxDownload = 200
577+
}
578+
if c.DownloadLimit > 0 && int64(c.MaxDownload*c.BlockSize) > c.DownloadLimit*int64(c.GetTimeout/time.Second)/2 {
579+
logger.Warnf("max-download %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxDownload, (c.DownloadLimit*8)>>20)
580+
}
607581
if c.BufferSize <= 32<<20 {
608582
logger.Warnf("buffer-size is too small, setting it to 32 MiB")
609583
c.BufferSize = 32 << 20
@@ -686,21 +660,22 @@ func (c *Config) CacheEnabled() bool {
686660
}
687661

688662
type cachedStore struct {
689-
storage object.ObjectStorage
690-
bcache CacheManager
691-
fetcher *prefetcher
692-
conf Config
693-
group *Controller
694-
currentUpload chan bool
695-
pendingCh chan *pendingItem
696-
pendingKeys map[string]*pendingItem
697-
pendingMutex sync.Mutex
698-
startHour int
699-
endHour int
700-
compressor compress.Compressor
701-
seekable bool
702-
upLimit *ratelimit.Bucket
703-
downLimit *ratelimit.Bucket
663+
storage object.ObjectStorage
664+
bcache CacheManager
665+
fetcher *prefetcher
666+
conf Config
667+
group *Controller
668+
currentUpload chan struct{}
669+
currentDownload chan struct{}
670+
pendingCh chan *pendingItem
671+
pendingKeys map[string]*pendingItem
672+
pendingMutex sync.Mutex
673+
startHour int
674+
endHour int
675+
compressor compress.Compressor
676+
seekable bool
677+
upLimit *ratelimit.Bucket
678+
downLimit *ratelimit.Bucket
704679

705680
cacheHits prometheus.Counter
706681
cacheMiss prometheus.Counter
@@ -722,13 +697,66 @@ func logRequest(typeStr, key, param, reqID string, err error, used time.Duration
722697
}
723698
}
724699

700+
var errTryFullRead = errors.New("try full read")
701+
702+
func (store *cachedStore) loadRange(ctx context.Context, key string, page *Page, off int) (n int, err error) {
703+
p := page.Data
704+
fullPage, err := store.group.TryPiggyback(key)
705+
if fullPage != nil {
706+
defer fullPage.Release()
707+
if err == nil { // piggybacked a full read
708+
n = copy(p, fullPage.Data[off:])
709+
return n, nil
710+
}
711+
}
712+
713+
store.currentDownload <- struct{}{}
714+
defer func() { <-store.currentDownload }()
715+
if store.downLimit != nil {
716+
store.downLimit.Wait(int64(len(p)))
717+
}
718+
719+
start := time.Now()
720+
var (
721+
reqID string
722+
sc = object.DefaultStorageClass
723+
)
724+
page.Acquire()
725+
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
726+
defer page.Release()
727+
in, err := store.storage.Get(cCtx, key, int64(off), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
728+
if err == nil {
729+
n, err = io.ReadFull(in, p)
730+
_ = in.Close()
731+
}
732+
return err
733+
}, store.conf.GetTimeout)
734+
735+
used := time.Since(start)
736+
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", off, len(p)), reqID, err, used)
737+
if errors.Is(err, context.Canceled) {
738+
return 0, err
739+
}
740+
store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
741+
store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
742+
if err == nil {
743+
store.fetcher.fetch(key)
744+
return n, nil
745+
}
746+
store.objectReqErrors.Add(1)
747+
// fall back to full read
748+
return 0, errTryFullRead
749+
}
750+
725751
func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) {
726752
defer func() {
727753
e := recover()
728754
if e != nil {
729755
err = fmt.Errorf("recovered from %s", e)
730756
}
731757
}()
758+
store.currentDownload <- struct{}{}
759+
defer func() { <-store.currentDownload }()
732760
needed := store.compressor.CompressBound(len(page.Data))
733761
compressed := needed > len(page.Data)
734762
// we don't know the actual size for compressed block
@@ -806,14 +834,15 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
806834
config.PutTimeout = time.Second * 60
807835
}
808836
store := &cachedStore{
809-
storage: storage,
810-
conf: config,
811-
currentUpload: make(chan bool, config.MaxUpload),
812-
compressor: compressor,
813-
seekable: compressor.CompressBound(0) == 0,
814-
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
815-
pendingKeys: make(map[string]*pendingItem),
816-
group: NewController(),
837+
storage: storage,
838+
conf: config,
839+
currentUpload: make(chan struct{}, config.MaxUpload),
840+
currentDownload: make(chan struct{}, config.MaxDownload),
841+
compressor: compressor,
842+
seekable: compressor.CompressBound(0) == 0,
843+
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
844+
pendingKeys: make(map[string]*pendingItem),
845+
group: NewController(),
817846
}
818847
if config.UploadLimit > 0 {
819848
// there are overheads coming from HTTP/TCP/IP
@@ -993,7 +1022,7 @@ func parseObjOrigSize(key string) int {
9931022
}
9941023

9951024
func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
996-
store.currentUpload <- true
1025+
store.currentUpload <- struct{}{}
9971026
defer func() {
9981027
<-store.currentUpload
9991028
}()

pkg/chunk/cached_store_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ var defaultConf = Config{
108108
CacheChecksum: CsNone,
109109
CacheScanInterval: time.Second * 300,
110110
MaxUpload: 1,
111+
MaxDownload: 200,
111112
MaxRetries: 10,
112113
PutTimeout: time.Second,
113114
GetTimeout: time.Second * 2,

pkg/fs/fs_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,10 @@ func createTestFS(t *testing.T) *FileSystem {
284284
var conf = vfs.Config{
285285
Meta: meta.DefaultConf(),
286286
Chunk: &chunk.Config{
287-
BlockSize: format.BlockSize << 10,
288-
MaxUpload: 1,
289-
BufferSize: 100 << 20,
287+
BlockSize: format.BlockSize << 10,
288+
MaxUpload: 1,
289+
MaxDownload: 200,
290+
BufferSize: 100 << 20,
290291
},
291292
DirEntryTimeout: time.Millisecond * 100,
292293
EntryTimeout: time.Millisecond * 100,

pkg/fuse/fuse_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,13 @@ func mount(url, mp string) {
7373
}
7474

7575
chunkConf := chunk.Config{
76-
BlockSize: format.BlockSize * 1024,
77-
Compress: format.Compression,
78-
MaxUpload: 20,
79-
BufferSize: 300 << 20,
80-
CacheSize: 1024,
81-
CacheDir: "memory",
76+
BlockSize: format.BlockSize * 1024,
77+
Compress: format.Compression,
78+
MaxUpload: 20,
79+
MaxDownload: 200,
80+
BufferSize: 300 << 20,
81+
CacheSize: 1024,
82+
CacheDir: "memory",
8283
}
8384

8485
blob, err := object.CreateStorage(strings.ToLower(format.Storage), format.Bucket, format.AccessKey, format.SecretKey, format.SessionToken)

pkg/gateway/gateway_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@ package gateway
1919
import (
2020
"context"
2121
"errors"
22+
"os"
23+
"testing"
24+
"time"
25+
2226
"github.com/juicedata/juicefs/pkg/chunk"
2327
"github.com/juicedata/juicefs/pkg/fs"
2428
"github.com/juicedata/juicefs/pkg/meta"
2529
"github.com/juicedata/juicefs/pkg/object"
2630
"github.com/juicedata/juicefs/pkg/vfs"
2731
minio "github.com/minio/minio/cmd"
28-
"os"
29-
"testing"
30-
"time"
3132
)
3233

3334
func TestGatewayLock(t *testing.T) {
@@ -42,9 +43,10 @@ func TestGatewayLock(t *testing.T) {
4243
var conf = vfs.Config{
4344
Meta: meta.DefaultConf(),
4445
Chunk: &chunk.Config{
45-
BlockSize: format.BlockSize << 10,
46-
MaxUpload: 1,
47-
BufferSize: 100 << 20,
46+
BlockSize: format.BlockSize << 10,
47+
MaxUpload: 1,
48+
MaxDownload: 200,
49+
BufferSize: 100 << 20,
4850
},
4951
DirEntryTimeout: time.Millisecond * 100,
5052
EntryTimeout: time.Millisecond * 100,
208 KB
Binary file not shown.

0 commit comments

Comments
 (0)