diff --git a/cmd/flags.go b/cmd/flags.go index 6a0fbb843b18..dc6b6b046432 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -128,6 +128,11 @@ func storageFlags() []cli.Flag { Value: 20, Usage: "number of connections to upload", }, + &cli.IntFlag{ + Name: "max-downloads", + Value: 200, + Usage: "number of connections to download", + }, &cli.IntFlag{ Name: "max-stage-write", Value: 1000, // large enough for normal cases, also prevents unlimited concurrency in abnormal cases diff --git a/cmd/mount.go b/cmd/mount.go index 500f2eba162d..34a36a3813c6 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -367,6 +367,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config { GetTimeout: utils.Duration(c.String("get-timeout")), PutTimeout: utils.Duration(c.String("put-timeout")), MaxUpload: c.Int("max-uploads"), + MaxDownload: c.Int("max-downloads"), MaxStageWrite: c.Int("max-stage-write"), MaxRetries: c.Int("io-retries"), Writeback: c.Bool("writeback"), diff --git a/cmd/object.go b/cmd/object.go index d13077664431..245f47de32fa 100644 --- a/cmd/object.go +++ b/cmd/object.go @@ -395,14 +395,15 @@ func (j *juiceFS) Readlink(name string) (string, error) { func getDefaultChunkConf(format *meta.Format) *chunk.Config { chunkConf := &chunk.Config{ - BlockSize: format.BlockSize * 1024, - Compress: format.Compression, - HashPrefix: format.HashPrefix, - GetTimeout: time.Minute, - PutTimeout: time.Minute, - MaxUpload: 50, - MaxRetries: 10, - BufferSize: 300 << 20, + BlockSize: format.BlockSize * 1024, + Compress: format.Compression, + HashPrefix: format.HashPrefix, + GetTimeout: time.Minute, + PutTimeout: time.Minute, + MaxUpload: 50, + MaxDownload: 200, + MaxRetries: 10, + BufferSize: 300 << 20, } chunkConf.SelfCheck(format.UUID) return chunkConf diff --git a/cmd/object_test.go b/cmd/object_test.go index 082924bb53df..0c8fef13fc74 100644 --- a/cmd/object_test.go +++ b/cmd/object_test.go @@ -178,9 +178,10 @@ func TestJFS(t *testing.T) { var conf = vfs.Config{ Meta: meta.DefaultConf(), Chunk: &chunk.Config{ - BlockSize: format.BlockSize << 10, - MaxUpload: 1, - BufferSize: 100 << 20, + BlockSize: format.BlockSize << 10, + MaxUpload: 1, + MaxDownload: 200, + BufferSize: 100 << 20, }, DirEntryTimeout: time.Millisecond * 100, EntryTimeout: time.Millisecond * 100, diff --git a/pkg/chunk/cached_store.go b/pkg/chunk/cached_store.go index f66d8a6b6e36..4472a4edc19e 100644 --- a/pkg/chunk/cached_store.go +++ b/pkg/chunk/cached_store.go @@ -152,46 +152,9 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er if s.store.seekable && (!s.store.conf.CacheEnabled() || (boff > 0 && len(p) <= blockSize/4)) { - if s.store.downLimit != nil { - s.store.downLimit.Wait(int64(len(p))) - } - fullPage, err := s.store.group.TryPiggyback(key) - if fullPage != nil { - defer fullPage.Release() - if err == nil { // piggybacked a full read - n = copy(p, fullPage.Data[boff:]) - return n, nil - } - } - // partial read - st := time.Now() - var ( - reqID string - sc = object.DefaultStorageClass - ) - page.Acquire() - err = utils.WithTimeout(ctx, func(cCtx context.Context) error { - defer page.Release() - in, err := s.store.storage.Get(cCtx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc)) - if err == nil { - n, err = io.ReadFull(in, p) - _ = in.Close() - } - return err - }, s.store.conf.GetTimeout) - used := time.Since(st) - logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", boff, len(p)), reqID, err, used) - if errors.Is(err, context.Canceled) { - return 0, err - } - s.store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n)) - s.store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds()) - if err == nil { - s.store.fetcher.fetch(key) - return n, nil - } else { - s.store.objectReqErrors.Add(1) - // fall back to full read + n, err = s.store.loadRange(ctx, key, page, boff) + if err == nil || !errors.Is(err, errTryFullRead) { + return n, err } } @@ -476,7 +439,7 @@ func (s *wSlice) upload(indx int) { s.errors <- nil if s.store.conf.UploadDelay == 0 && s.store.canUpload() { select { - case s.store.currentUpload <- true: + case s.store.currentUpload <- struct{}{}: defer func() { <-s.store.currentUpload }() if err = s.store.upload(key, block, nil); err == nil { s.store.bcache.uploaded(key, blen) @@ -495,7 +458,7 @@ func (s *wSlice) upload(indx int) { return } } - s.store.currentUpload <- true + s.store.currentUpload <- struct{}{} defer func() { <-s.store.currentUpload }() s.errors <- s.store.upload(key, block, s) }() @@ -572,6 +535,7 @@ type Config struct { AutoCreate bool Compress string MaxUpload int + MaxDownload int MaxStageWrite int MaxRetries int UploadLimit int64 // bytes per second @@ -604,6 +568,16 @@ func (c *Config) SelfCheck(uuid string) { logger.Warnf("max-uploads should be greater than 0, set it to 1") c.MaxUpload = 1 } + if c.UploadLimit > 0 && int64(c.MaxUpload*c.BlockSize) > c.UploadLimit*int64(c.GetTimeout/time.Second)/2 { + logger.Warnf("max-upload %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxUpload, c.UploadDelay*8>>20) + } + if c.MaxDownload <= 0 { + logger.Warnf("max-downloads should be greater than 0, set it to 200") + c.MaxDownload = 200 + } + if c.DownloadLimit > 0 && int64(c.MaxDownload*c.BlockSize) > c.DownloadLimit*int64(c.GetTimeout/time.Second)/2 { + logger.Warnf("max-download %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxDownload, (c.DownloadLimit*8)>>20) + } if c.BufferSize <= 32<<20 { logger.Warnf("buffer-size is too small, setting it to 32 MiB") c.BufferSize = 32 << 20 @@ -686,21 +660,22 @@ func (c *Config) CacheEnabled() bool { } type cachedStore struct { - storage object.ObjectStorage - bcache CacheManager - fetcher *prefetcher - conf Config - group *Controller - currentUpload chan bool - pendingCh chan *pendingItem - pendingKeys map[string]*pendingItem - pendingMutex sync.Mutex - startHour int - endHour int - compressor compress.Compressor - seekable bool - upLimit *ratelimit.Bucket - downLimit *ratelimit.Bucket + storage object.ObjectStorage + bcache CacheManager + fetcher *prefetcher + conf Config + group *Controller + currentUpload chan struct{} + currentDownload chan struct{} + pendingCh chan *pendingItem + pendingKeys map[string]*pendingItem + pendingMutex sync.Mutex + startHour int + endHour int + compressor compress.Compressor + seekable bool + upLimit *ratelimit.Bucket + downLimit *ratelimit.Bucket cacheHits prometheus.Counter cacheMiss prometheus.Counter @@ -722,6 +697,57 @@ func logRequest(typeStr, key, param, reqID string, err error, used time.Duration } } +var errTryFullRead = errors.New("try full read") + +func (store *cachedStore) loadRange(ctx context.Context, key string, page *Page, off int) (n int, err error) { + p := page.Data + fullPage, err := store.group.TryPiggyback(key) + if fullPage != nil { + defer fullPage.Release() + if err == nil { // piggybacked a full read + n = copy(p, fullPage.Data[off:]) + return n, nil + } + } + + store.currentDownload <- struct{}{} + defer func() { <-store.currentDownload }() + if store.downLimit != nil { + store.downLimit.Wait(int64(len(p))) + } + + start := time.Now() + var ( + reqID string + sc = object.DefaultStorageClass + ) + page.Acquire() + err = utils.WithTimeout(ctx, func(cCtx context.Context) error { + defer page.Release() + in, err := store.storage.Get(cCtx, key, int64(off), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc)) + if err == nil { + n, err = io.ReadFull(in, p) + _ = in.Close() + } + return err + }, store.conf.GetTimeout) + + used := time.Since(start) + logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", off, len(p)), reqID, err, used) + if errors.Is(err, context.Canceled) { + return 0, err + } + store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n)) + store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds()) + if err == nil { + store.fetcher.fetch(key) + return n, nil + } + store.objectReqErrors.Add(1) + // fall back to full read + return 0, errTryFullRead +} + func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) { defer func() { e := recover() @@ -729,6 +755,8 @@ func (store *cachedStore) load(ctx context.Context, key string, page *Page, cach err = fmt.Errorf("recovered from %s", e) } }() + store.currentDownload <- struct{}{} + defer func() { <-store.currentDownload }() needed := store.compressor.CompressBound(len(page.Data)) compressed := needed > len(page.Data) // we don't know the actual size for compressed block @@ -806,14 +834,15 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus. config.PutTimeout = time.Second * 60 } store := &cachedStore{ - storage: storage, - conf: config, - currentUpload: make(chan bool, config.MaxUpload), - compressor: compressor, - seekable: compressor.CompressBound(0) == 0, - pendingCh: make(chan *pendingItem, 100*config.MaxUpload), - pendingKeys: make(map[string]*pendingItem), - group: NewController(), + storage: storage, + conf: config, + currentUpload: make(chan struct{}, config.MaxUpload), + currentDownload: make(chan struct{}, config.MaxDownload), + compressor: compressor, + seekable: compressor.CompressBound(0) == 0, + pendingCh: make(chan *pendingItem, 100*config.MaxUpload), + pendingKeys: make(map[string]*pendingItem), + group: NewController(), } if config.UploadLimit > 0 { // there are overheads coming from HTTP/TCP/IP @@ -993,7 +1022,7 @@ func parseObjOrigSize(key string) int { } func (store *cachedStore) uploadStagingFile(key string, stagingPath string) { - store.currentUpload <- true + store.currentUpload <- struct{}{} defer func() { <-store.currentUpload }() diff --git a/pkg/chunk/cached_store_test.go b/pkg/chunk/cached_store_test.go index a8cd42d5eabb..b7e3581d6573 100644 --- a/pkg/chunk/cached_store_test.go +++ b/pkg/chunk/cached_store_test.go @@ -108,6 +108,7 @@ var defaultConf = Config{ CacheChecksum: CsNone, CacheScanInterval: time.Second * 300, MaxUpload: 1, + MaxDownload: 200, MaxRetries: 10, PutTimeout: time.Second, GetTimeout: time.Second * 2, diff --git a/pkg/fs/fs_test.go b/pkg/fs/fs_test.go index 567f67fbc98a..c5fe0b8751d7 100644 --- a/pkg/fs/fs_test.go +++ b/pkg/fs/fs_test.go @@ -284,9 +284,10 @@ func createTestFS(t *testing.T) *FileSystem { var conf = vfs.Config{ Meta: meta.DefaultConf(), Chunk: &chunk.Config{ - BlockSize: format.BlockSize << 10, - MaxUpload: 1, - BufferSize: 100 << 20, + BlockSize: format.BlockSize << 10, + MaxUpload: 1, + MaxDownload: 200, + BufferSize: 100 << 20, }, DirEntryTimeout: time.Millisecond * 100, EntryTimeout: time.Millisecond * 100, diff --git a/pkg/fuse/fuse_test.go b/pkg/fuse/fuse_test.go index 0f55c8376a7b..431679a9c0ec 100644 --- a/pkg/fuse/fuse_test.go +++ b/pkg/fuse/fuse_test.go @@ -73,12 +73,13 @@ func mount(url, mp string) { } chunkConf := chunk.Config{ - BlockSize: format.BlockSize * 1024, - Compress: format.Compression, - MaxUpload: 20, - BufferSize: 300 << 20, - CacheSize: 1024, - CacheDir: "memory", + BlockSize: format.BlockSize * 1024, + Compress: format.Compression, + MaxUpload: 20, + MaxDownload: 200, + BufferSize: 300 << 20, + CacheSize: 1024, + CacheDir: "memory", } blob, err := object.CreateStorage(strings.ToLower(format.Storage), format.Bucket, format.AccessKey, format.SecretKey, format.SessionToken) diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index 125d65617fcb..97e74c828bdb 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -19,15 +19,16 @@ package gateway import ( "context" "errors" + "os" + "testing" + "time" + "github.com/juicedata/juicefs/pkg/chunk" "github.com/juicedata/juicefs/pkg/fs" "github.com/juicedata/juicefs/pkg/meta" "github.com/juicedata/juicefs/pkg/object" "github.com/juicedata/juicefs/pkg/vfs" minio "github.com/minio/minio/cmd" - "os" - "testing" - "time" ) func TestGatewayLock(t *testing.T) { @@ -42,9 +43,10 @@ func TestGatewayLock(t *testing.T) { var conf = vfs.Config{ Meta: meta.DefaultConf(), Chunk: &chunk.Config{ - BlockSize: format.BlockSize << 10, - MaxUpload: 1, - BufferSize: 100 << 20, + BlockSize: format.BlockSize << 10, + MaxUpload: 1, + MaxDownload: 200, + BufferSize: 100 << 20, }, DirEntryTimeout: time.Millisecond * 100, EntryTimeout: time.Millisecond * 100, diff --git a/pkg/vfs/?_journal=WAL&_timeout=5000&cache=shared b/pkg/vfs/?_journal=WAL&_timeout=5000&cache=shared new file mode 100644 index 000000000000..80777752d52b Binary files /dev/null and b/pkg/vfs/?_journal=WAL&_timeout=5000&cache=shared differ diff --git a/pkg/vfs/compact_test.go b/pkg/vfs/compact_test.go index e864ee764ce2..b7698f5f5403 100644 --- a/pkg/vfs/compact_test.go +++ b/pkg/vfs/compact_test.go @@ -27,12 +27,13 @@ import ( func TestCompact(t *testing.T) { cconf := chunk.Config{ - BlockSize: 256 * 1024, - Compress: "lz4", - MaxUpload: 2, - BufferSize: 30 << 20, - CacheSize: 10 << 20, - CacheDir: "memory", + BlockSize: 256 * 1024, + Compress: "lz4", + MaxUpload: 2, + MaxDownload: 200, + BufferSize: 30 << 20, + CacheSize: 10 << 20, + CacheDir: "memory", } blob, _ := object.CreateStorage("mem", "", "", "", "") store := chunk.NewCachedStore(blob, cconf, nil) diff --git a/pkg/vfs/vfs_test.go b/pkg/vfs/vfs_test.go index a5e8ee46d73c..9d4815bcf82e 100644 --- a/pkg/vfs/vfs_test.go +++ b/pkg/vfs/vfs_test.go @@ -69,12 +69,13 @@ func createTestVFS(applyMetaConfOption func(metaConfig *meta.Config), metaUri st Format: *format, Version: "Juicefs", Chunk: &chunk.Config{ - BlockSize: format.BlockSize * 1024, - Compress: format.Compression, - MaxUpload: 2, - BufferSize: 30 << 20, - CacheSize: 10 << 20, - CacheDir: "memory", + BlockSize: format.BlockSize * 1024, + Compress: format.Compression, + MaxUpload: 2, + MaxDownload: 200, + BufferSize: 30 << 20, + CacheSize: 10 << 20, + CacheDir: "memory", }, FuseOpts: &FuseOptions{}, }