Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func storageFlags() []cli.Flag {
Value: 20,
Usage: "number of connections to upload",
},
&cli.IntFlag{
Name: "max-downloads",
Value: 200,
Usage: "number of connections to download",
},
&cli.IntFlag{
Name: "max-stage-write",
Value: 1000, // large enough for normal cases, also prevents unlimited concurrency in abnormal cases
Expand Down
1 change: 1 addition & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
GetTimeout: utils.Duration(c.String("get-timeout")),
PutTimeout: utils.Duration(c.String("put-timeout")),
MaxUpload: c.Int("max-uploads"),
MaxDownload: c.Int("max-downloads"),
MaxStageWrite: c.Int("max-stage-write"),
MaxRetries: c.Int("io-retries"),
Writeback: c.Bool("writeback"),
Expand Down
17 changes: 9 additions & 8 deletions cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,15 @@ func (j *juiceFS) Readlink(name string) (string, error) {

func getDefaultChunkConf(format *meta.Format) *chunk.Config {
chunkConf := &chunk.Config{
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
HashPrefix: format.HashPrefix,
GetTimeout: time.Minute,
PutTimeout: time.Minute,
MaxUpload: 50,
MaxRetries: 10,
BufferSize: 300 << 20,
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
HashPrefix: format.HashPrefix,
GetTimeout: time.Minute,
PutTimeout: time.Minute,
MaxUpload: 50,
MaxDownload: 200,
MaxRetries: 10,
BufferSize: 300 << 20,
}
chunkConf.SelfCheck(format.UUID)
return chunkConf
Expand Down
7 changes: 4 additions & 3 deletions cmd/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,10 @@ func TestJFS(t *testing.T) {
var conf = vfs.Config{
Meta: meta.DefaultConf(),
Chunk: &chunk.Config{
BlockSize: format.BlockSize << 10,
MaxUpload: 1,
BufferSize: 100 << 20,
BlockSize: format.BlockSize << 10,
MaxUpload: 1,
MaxDownload: 200,
BufferSize: 100 << 20,
},
DirEntryTimeout: time.Millisecond * 100,
EntryTimeout: time.Millisecond * 100,
Expand Down
161 changes: 95 additions & 66 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,46 +152,9 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er

if s.store.seekable &&
(!s.store.conf.CacheEnabled() || (boff > 0 && len(p) <= blockSize/4)) {
if s.store.downLimit != nil {
s.store.downLimit.Wait(int64(len(p)))
}
fullPage, err := s.store.group.TryPiggyback(key)
if fullPage != nil {
defer fullPage.Release()
if err == nil { // piggybacked a full read
n = copy(p, fullPage.Data[boff:])
return n, nil
}
}
// partial read
st := time.Now()
var (
reqID string
sc = object.DefaultStorageClass
)
page.Acquire()
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer page.Release()
in, err := s.store.storage.Get(cCtx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p)
_ = in.Close()
}
return err
}, s.store.conf.GetTimeout)
used := time.Since(st)
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", boff, len(p)), reqID, err, used)
if errors.Is(err, context.Canceled) {
return 0, err
}
s.store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
s.store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
if err == nil {
s.store.fetcher.fetch(key)
return n, nil
} else {
s.store.objectReqErrors.Add(1)
// fall back to full read
n, err = s.store.loadRange(ctx, key, page, boff)
if err == nil || !errors.Is(err, errTryFullRead) {
return n, err
}
}

Expand Down Expand Up @@ -476,7 +439,7 @@ func (s *wSlice) upload(indx int) {
s.errors <- nil
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
select {
case s.store.currentUpload <- true:
case s.store.currentUpload <- struct{}{}:
defer func() { <-s.store.currentUpload }()
if err = s.store.upload(key, block, nil); err == nil {
s.store.bcache.uploaded(key, blen)
Expand All @@ -495,7 +458,7 @@ func (s *wSlice) upload(indx int) {
return
}
}
s.store.currentUpload <- true
s.store.currentUpload <- struct{}{}
defer func() { <-s.store.currentUpload }()
s.errors <- s.store.upload(key, block, s)
}()
Expand Down Expand Up @@ -572,6 +535,7 @@ type Config struct {
AutoCreate bool
Compress string
MaxUpload int
MaxDownload int
MaxStageWrite int
MaxRetries int
UploadLimit int64 // bytes per second
Expand Down Expand Up @@ -604,6 +568,16 @@ func (c *Config) SelfCheck(uuid string) {
logger.Warnf("max-uploads should be greater than 0, set it to 1")
c.MaxUpload = 1
}
if c.UploadLimit > 0 && int64(c.MaxUpload*c.BlockSize) > c.UploadLimit*int64(c.GetTimeout/time.Second)/2 {
logger.Warnf("max-upload %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxUpload, c.UploadDelay*8>>20)
}
if c.MaxDownload <= 0 {
logger.Warnf("max-downloads should be greater than 0, set it to 200")
c.MaxDownload = 200
}
if c.DownloadLimit > 0 && int64(c.MaxDownload*c.BlockSize) > c.DownloadLimit*int64(c.GetTimeout/time.Second)/2 {
logger.Warnf("max-download %d may exceed bandwidth limit (bw: %d Mbps)", c.MaxDownload, (c.DownloadLimit*8)>>20)
}
if c.BufferSize <= 32<<20 {
logger.Warnf("buffer-size is too small, setting it to 32 MiB")
c.BufferSize = 32 << 20
Expand Down Expand Up @@ -686,21 +660,22 @@ func (c *Config) CacheEnabled() bool {
}

type cachedStore struct {
storage object.ObjectStorage
bcache CacheManager
fetcher *prefetcher
conf Config
group *Controller
currentUpload chan bool
pendingCh chan *pendingItem
pendingKeys map[string]*pendingItem
pendingMutex sync.Mutex
startHour int
endHour int
compressor compress.Compressor
seekable bool
upLimit *ratelimit.Bucket
downLimit *ratelimit.Bucket
storage object.ObjectStorage
bcache CacheManager
fetcher *prefetcher
conf Config
group *Controller
currentUpload chan struct{}
currentDownload chan struct{}
pendingCh chan *pendingItem
pendingKeys map[string]*pendingItem
pendingMutex sync.Mutex
startHour int
endHour int
compressor compress.Compressor
seekable bool
upLimit *ratelimit.Bucket
downLimit *ratelimit.Bucket

cacheHits prometheus.Counter
cacheMiss prometheus.Counter
Expand All @@ -722,13 +697,66 @@ func logRequest(typeStr, key, param, reqID string, err error, used time.Duration
}
}

var errTryFullRead = errors.New("try full read")

func (store *cachedStore) loadRange(ctx context.Context, key string, page *Page, off int) (n int, err error) {
p := page.Data
fullPage, err := store.group.TryPiggyback(key)
if fullPage != nil {
defer fullPage.Release()
if err == nil { // piggybacked a full read
n = copy(p, fullPage.Data[off:])
return n, nil
}
}

store.currentDownload <- struct{}{}
defer func() { <-store.currentDownload }()
if store.downLimit != nil {
store.downLimit.Wait(int64(len(p)))
}

start := time.Now()
var (
reqID string
sc = object.DefaultStorageClass
)
page.Acquire()
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer page.Release()
in, err := store.storage.Get(cCtx, key, int64(off), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p)
_ = in.Close()
}
return err
}, store.conf.GetTimeout)

used := time.Since(start)
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", off, len(p)), reqID, err, used)
if errors.Is(err, context.Canceled) {
return 0, err
}
store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
if err == nil {
store.fetcher.fetch(key)
return n, nil
}
store.objectReqErrors.Add(1)
// fall back to full read
return 0, errTryFullRead
}

func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) {
defer func() {
e := recover()
if e != nil {
err = fmt.Errorf("recovered from %s", e)
}
}()
store.currentDownload <- struct{}{}
defer func() { <-store.currentDownload }()
needed := store.compressor.CompressBound(len(page.Data))
compressed := needed > len(page.Data)
// we don't know the actual size for compressed block
Expand Down Expand Up @@ -806,14 +834,15 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
config.PutTimeout = time.Second * 60
}
store := &cachedStore{
storage: storage,
conf: config,
currentUpload: make(chan bool, config.MaxUpload),
compressor: compressor,
seekable: compressor.CompressBound(0) == 0,
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
pendingKeys: make(map[string]*pendingItem),
group: NewController(),
storage: storage,
conf: config,
currentUpload: make(chan struct{}, config.MaxUpload),
currentDownload: make(chan struct{}, config.MaxDownload),
compressor: compressor,
seekable: compressor.CompressBound(0) == 0,
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
pendingKeys: make(map[string]*pendingItem),
group: NewController(),
}
if config.UploadLimit > 0 {
// there are overheads coming from HTTP/TCP/IP
Expand Down Expand Up @@ -993,7 +1022,7 @@ func parseObjOrigSize(key string) int {
}

func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
store.currentUpload <- true
store.currentUpload <- struct{}{}
defer func() {
<-store.currentUpload
}()
Expand Down
1 change: 1 addition & 0 deletions pkg/chunk/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var defaultConf = Config{
CacheChecksum: CsNone,
CacheScanInterval: time.Second * 300,
MaxUpload: 1,
MaxDownload: 200,
MaxRetries: 10,
PutTimeout: time.Second,
GetTimeout: time.Second * 2,
Expand Down
7 changes: 4 additions & 3 deletions pkg/fs/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ func createTestFS(t *testing.T) *FileSystem {
var conf = vfs.Config{
Meta: meta.DefaultConf(),
Chunk: &chunk.Config{
BlockSize: format.BlockSize << 10,
MaxUpload: 1,
BufferSize: 100 << 20,
BlockSize: format.BlockSize << 10,
MaxUpload: 1,
MaxDownload: 200,
BufferSize: 100 << 20,
},
DirEntryTimeout: time.Millisecond * 100,
EntryTimeout: time.Millisecond * 100,
Expand Down
13 changes: 7 additions & 6 deletions pkg/fuse/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ func mount(url, mp string) {
}

chunkConf := chunk.Config{
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
MaxUpload: 20,
BufferSize: 300 << 20,
CacheSize: 1024,
CacheDir: "memory",
BlockSize: format.BlockSize * 1024,
Compress: format.Compression,
MaxUpload: 20,
MaxDownload: 200,
BufferSize: 300 << 20,
CacheSize: 1024,
CacheDir: "memory",
}

blob, err := object.CreateStorage(strings.ToLower(format.Storage), format.Bucket, format.AccessKey, format.SecretKey, format.SessionToken)
Expand Down
14 changes: 8 additions & 6 deletions pkg/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package gateway
import (
"context"
"errors"
"os"
"testing"
"time"

"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/fs"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/object"
"github.com/juicedata/juicefs/pkg/vfs"
minio "github.com/minio/minio/cmd"
"os"
"testing"
"time"
)

func TestGatewayLock(t *testing.T) {
Expand All @@ -42,9 +43,10 @@ func TestGatewayLock(t *testing.T) {
var conf = vfs.Config{
Meta: meta.DefaultConf(),
Chunk: &chunk.Config{
BlockSize: format.BlockSize << 10,
MaxUpload: 1,
BufferSize: 100 << 20,
BlockSize: format.BlockSize << 10,
MaxUpload: 1,
MaxDownload: 200,
BufferSize: 100 << 20,
},
DirEntryTimeout: time.Millisecond * 100,
EntryTimeout: time.Millisecond * 100,
Expand Down
Binary file added pkg/vfs/?_journal=WAL&_timeout=5000&cache=shared
Binary file not shown.
13 changes: 7 additions & 6 deletions pkg/vfs/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (

func TestCompact(t *testing.T) {
cconf := chunk.Config{
BlockSize: 256 * 1024,
Compress: "lz4",
MaxUpload: 2,
BufferSize: 30 << 20,
CacheSize: 10 << 20,
CacheDir: "memory",
BlockSize: 256 * 1024,
Compress: "lz4",
MaxUpload: 2,
MaxDownload: 200,
BufferSize: 30 << 20,
CacheSize: 10 << 20,
CacheDir: "memory",
}
blob, _ := object.CreateStorage("mem", "", "", "", "")
store := chunk.NewCachedStore(blob, cconf, nil)
Expand Down
Loading
Loading