Skip to content

Commit d00d06f

Browse files
authored
hadoop: fix term fs with different conf (#6462)
1 parent 32ecdd1 commit d00d06f

File tree

1 file changed

+26
-15
lines changed

1 file changed

+26
-15
lines changed

sdk/java/libjfs/main.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ import (
5252
"os"
5353
"path"
5454
"path/filepath"
55-
"reflect"
5655
"runtime/debug"
5756
"strconv"
5857
"strings"
@@ -86,7 +85,7 @@ var (
8685
fslock sync.Mutex
8786
handlers = make(map[int64]*wrapper)
8887
nextFsHandle int64 = 0
89-
activefs = make(map[string][]*wrapper)
88+
activefs = make(map[fsKey][]*wrapper)
9089
logger = utils.GetLogger("juicefs")
9190
bOnce sync.Once
9291
bridges []*Bridge
@@ -179,6 +178,11 @@ func errno(err error) int32 {
179178
}
180179
}
181180

181+
type fsKey struct {
182+
name string
183+
conf javaConf
184+
}
185+
182186
type wrapper struct {
183187
*fs.FileSystem
184188
ctx meta.Context
@@ -366,20 +370,22 @@ type javaConf struct {
366370
SuperFS bool `json:"superFs,omitempty"`
367371
}
368372

373+
func cleanConf(conf javaConf) javaConf {
374+
conf.SuperFS = false
375+
return conf
376+
}
377+
369378
func getOrCreate(name, user, group, superuser, supergroup string, conf javaConf, f func() *fs.FileSystem) int64 {
370379
fslock.Lock()
371380
defer fslock.Unlock()
372-
ws := activefs[name]
381+
key := fsKey{name: name, conf: cleanConf(conf)}
382+
ws := activefs[key]
373383
var jfs *fs.FileSystem
374384
var m *mapping
375-
for _, w := range ws {
376-
if reflect.DeepEqual(w.conf, conf) {
377-
jfs = w.FileSystem
378-
m = w.m
379-
break
380-
}
381-
}
382-
if jfs == nil {
385+
if len(ws) > 0 {
386+
jfs = ws[0].FileSystem
387+
m = ws[0].m
388+
} else {
383389
m = newMapping(name)
384390
jfs = f()
385391
if jfs == nil {
@@ -406,7 +412,7 @@ func getOrCreate(name, user, group, superuser, supergroup string, conf javaConf,
406412
} else {
407413
w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(user), w.lookupGids(group))
408414
}
409-
activefs[name] = append(ws, w)
415+
activefs[key] = append(ws, w)
410416
nextFsHandle = nextFsHandle + 1
411417
handlers[nextFsHandle] = w
412418
return nextFsHandle
@@ -758,7 +764,12 @@ func jfs_update_uid_grouping(cname, uidstr *C.char, grouping *C.char) {
758764
fslock.Lock()
759765
defer fslock.Unlock()
760766
userGroupCache[name] = userGroups
761-
ws := activefs[name]
767+
var ws []*wrapper
768+
for k, wrappers := range activefs {
769+
if k.name == name {
770+
ws = append(ws, wrappers...)
771+
}
772+
}
762773
if len(ws) > 0 {
763774
for _, w := range ws {
764775
w.m.update(uids, gids, false)
@@ -820,12 +831,12 @@ func jfs_term(pid int64, h int64) int32 {
820831
fslock.Lock()
821832
defer fslock.Unlock()
822833
delete(handlers, h)
823-
for name, ws := range activefs {
834+
for k, ws := range activefs {
824835
for i := range ws {
825836
if ws[i] == w {
826837
if len(ws) > 1 {
827838
ws[i] = ws[len(ws)-1]
828-
activefs[name] = ws[:len(ws)-1]
839+
activefs[k] = ws[:len(ws)-1]
829840
} else {
830841
_ = w.Flush()
831842
// don't close the filesystem, so it can be re-used later

0 commit comments

Comments
 (0)