From 5321310bd5de0df6743ea7bb8067c238717c8fd4 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 4 Nov 2025 19:07:46 +0800 Subject: [PATCH 1/6] hadoop: support kerberos --- cmd/config.go | 5 + cmd/format.go | 15 + pkg/fs/fs.go | 4 + pkg/meta/base.go | 32 + pkg/meta/config.go | 3 + pkg/meta/interface.go | 7 + pkg/meta/redis.go | 84 ++- pkg/meta/sql.go | 81 ++- pkg/meta/tkv.go | 67 +- pkg/meta/utils.go | 9 +- sdk/java/kerberos.sh | 43 ++ sdk/java/libjfs/kerberos.go | 644 ++++++++++++++++++ sdk/java/libjfs/main.go | 218 +++++- .../main/java/io/juicefs/JuiceFileSystem.java | 5 + .../java/io/juicefs/JuiceFileSystemImpl.java | 160 ++++- .../io/juicefs/kerberos/AuthCredential.java | 42 ++ .../JuiceFSDelegationTokenIdentifier.java | 41 ++ .../juicefs/kerberos/JuiceFSTokenRenewer.java | 62 ++ .../io/juicefs/kerberos/KerberosUtil.java | 41 ++ ...ache.hadoop.security.token.TokenIdentifier | 1 + ....apache.hadoop.security.token.TokenRenewer | 1 + .../io/juicefs/kerberos/KerberosTest.java | 208 ++++++ sdk/java/src/test/resources/kerberos.cfg | 26 + 23 files changed, 1751 insertions(+), 48 deletions(-) create mode 100755 sdk/java/kerberos.sh create mode 100644 sdk/java/libjfs/kerberos.go create mode 100644 sdk/java/src/main/java/io/juicefs/kerberos/AuthCredential.java create mode 100644 sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSDelegationTokenIdentifier.java create mode 100644 sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSTokenRenewer.java create mode 100644 sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java create mode 100644 sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier create mode 100644 sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer create mode 100644 sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java create mode 100644 sdk/java/src/test/resources/kerberos.cfg diff --git a/cmd/config.go b/cmd/config.go index b38f180b10ed..694d3277edbb 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -281,6 +281,11 @@ func config(ctx *cli.Context) error { format.MinClientVersion = "1.3.0-A" clientVer = true } + case "kerberos-config-file": + msg.WriteString(fmt.Sprintf("%s: updated\n", flag)) + format.KerbConf = readKerbConf(ctx.String(flag)) + format.MinClientVersion = "1.4.0-A" + clientVer = true } } if msg.Len() == 0 { diff --git a/cmd/format.go b/cmd/format.go index e3dd7fe28dad..cb21b34fc28a 100644 --- a/cmd/format.go +++ b/cmd/format.go @@ -205,6 +205,10 @@ func formatManagementFlags() []cli.Flag { Name: "ranger-service", Usage: "Name of the Ranger service used For JuiceFS", }, + &cli.StringFlag{ + Name: "kerberos-config-file", + Usage: "Path to Kerberos configuration file", + }, }) } @@ -393,6 +397,14 @@ func loadEncrypt(keyPath string) string { return string(pem) } +func readKerbConf(file string) string { + data, err := os.ReadFile(file) + if err != nil { + logger.Fatalf("load Kerberos config from %s: %s", file, err) + } + return string(data) +} + func format(c *cli.Context) error { setup(c, 2) removePassword(c.Args().Get(0)) @@ -459,6 +471,8 @@ func format(c *cli.Context) error { format.RangerRestUrl = c.String(flag) case "ranger-service": format.RangerService = c.String(flag) + case "kerberos-config-file": + format.KerbConf = readKerbConf(c.String(flag)) } } } else if strings.HasPrefix(err.Error(), "database is not formatted") { @@ -488,6 +502,7 @@ func format(c *cli.Context) error { EnableACL: c.Bool("enable-acl"), RangerRestUrl: c.String("ranger-rest-url"), RangerService: c.String("ranger-service"), + KerbConf: readKerbConf(c.String("kerberos-config-file")), } if format.EnableACL { format.MinClientVersion = "1.2.0-A" diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index c3ecced6cc94..83ce728ee3a6 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -139,6 +139,10 @@ type FileSystem struct { store chunk.ChunkStore cacheFiller *vfs.CacheFiller + Superuser string + Supergroup string + OnSuperuserChanged []func() + cacheM sync.Mutex entries map[Ino]map[string]*entryCache attrs map[Ino]*attrCache diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 2e1bcb82b16d..87ce62ebf48d 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -144,6 +144,13 @@ type engine interface { doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, rule *aclAPI.Rule) syscall.Errno cacheACLs(ctx Context) error + // kerberos delegation token + doStoreToken(ctx Context, token []byte) (id uint32, st syscall.Errno) + doUpdateToken(ctx Context, id uint32, token []byte) syscall.Errno + doLoadToken(ctx Context, id uint32) (token []byte, st syscall.Errno) + doDeleteTokens(ctx Context, ids []uint32) syscall.Errno + doListTokens(ctx Context) (tokens map[uint32][]byte, st syscall.Errno) + newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler dump(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error @@ -3154,6 +3161,31 @@ func (m *baseMeta) GetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rul return m.en.doGetFacl(ctx, ino, aclType, aclAPI.None, rule) } +func (m *baseMeta) StoreToken(ctx Context, token []byte) (id uint32, st syscall.Errno) { + defer m.timeit("StoreToken", time.Now()) + return m.en.doStoreToken(ctx, token) +} + +func (m *baseMeta) UpdateToken(ctx Context, id uint32, token []byte) syscall.Errno { + defer m.timeit("UpdateToken", time.Now()) + return m.en.doUpdateToken(ctx, id, token) +} + +func (m *baseMeta) LoadToken(ctx Context, id uint32) (token []byte, st syscall.Errno) { + defer m.timeit("LoadToken", time.Now()) + return m.en.doLoadToken(ctx, id) +} + +func (m *baseMeta) DeleteTokens(ctx Context, ids []uint32) syscall.Errno { + defer m.timeit("DeleteTokens", time.Now()) + return m.en.doDeleteTokens(ctx, ids) +} + +func (m *baseMeta) ListTokens(ctx Context) (tokens map[uint32][]byte, st syscall.Errno) { + defer m.timeit("ListTokens", time.Now()) + return m.en.doListTokens(ctx) +} + func inGroup(ctx Context, gid uint32) bool { for _, egid := range ctx.Gids() { if egid == gid { diff --git a/pkg/meta/config.go b/pkg/meta/config.go index ead05b7e9536..81157c89417b 100644 --- a/pkg/meta/config.go +++ b/pkg/meta/config.go @@ -100,6 +100,9 @@ type Format struct { EnableACL bool RangerRestUrl string `json:",omitempty"` RangerService string `json:",omitempty"` + + //kerberos + KerbConf string `json:",omitempty"` } func (f *Format) update(old *Format, force bool) error { diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index bdf308b104a8..c34eceeb9ae4 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -529,6 +529,13 @@ type Meta interface { SetFacl(ctx Context, ino Ino, aclType uint8, n *aclAPI.Rule) syscall.Errno GetFacl(ctx Context, ino Ino, aclType uint8, n *aclAPI.Rule) syscall.Errno + + // kerberos + StoreToken(ctx Context, token []byte) (id uint32, st syscall.Errno) + UpdateToken(ctx Context, id uint32, token []byte) syscall.Errno + LoadToken(ctx Context, id uint32) (token []byte, st syscall.Errno) + DeleteTokens(ctx Context, ids []uint32) syscall.Errno + ListTokens(ctx Context) (tokens map[uint32][]byte, st syscall.Errno) } type Creator func(driver, addr string, conf *Config) (Meta, error) diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 2d9941fdaaf5..b5c8c0d76ccc 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -77,6 +77,7 @@ import ( Quota used space: dirQuotaUsedSpace -> { $inode -> usedSpace } Quota used inodes: dirQuotaUsedInodes -> { $inode -> usedInodes } Acl: acl -> { $acl_id -> acl } + KrbToken: krbToken -> { $token_id -> token } Redis features: Sorted Set: 1.2+ @@ -655,6 +656,10 @@ func (m *redisMeta) aclKey() string { return m.prefix + "acl" } +func (m *redisMeta) krbTokenKey() string { + return m.prefix + "krbToken" +} + func (m *redisMeta) delfiles() string { return m.prefix + "delfiles" } @@ -1340,7 +1345,7 @@ func (m *redisMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, m if (pattr.Flags & FlagImmutable) != 0 { return syscall.EPERM } - if (pattr.Flags&FlagSkipTrash) != 0 { + if (pattr.Flags & FlagSkipTrash) != 0 { attr.Flags |= FlagSkipTrash } @@ -1694,7 +1699,7 @@ func (m *redisMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, o if ctx.Uid() != 0 && pattr.Mode&01000 != 0 && ctx.Uid() != pattr.Uid && ctx.Uid() != attr.Uid { return syscall.EACCES } - if (attr.Flags&FlagSkipTrash) != 0 { + if (attr.Flags & FlagSkipTrash) != 0 { trash = 0 } if trash > 0 { @@ -1873,7 +1878,7 @@ func (m *redisMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentD if (tattr.Flags&FlagAppend) != 0 || (tattr.Flags&FlagImmutable) != 0 { return syscall.EPERM } - if (tattr.Flags&FlagSkipTrash) != 0 { + if (tattr.Flags & FlagSkipTrash) != 0 { trash = 0 } tattr.Ctime = now.Unix() @@ -4759,6 +4764,79 @@ func (m *redisMeta) loadDumpedACLs(ctx Context) error { }, m.inodeKey(RootInode)) } +func (m *redisMeta) doStoreToken(ctx Context, token []byte) (id uint32, st syscall.Errno) { + err := m.txn(ctx, func(tx *redis.Tx) error { + newId, err := m.incrCounter(krbTokenCounter, 1) + if err != nil { + return err + } + err = tx.HSet(ctx, m.krbTokenKey(), strconv.FormatUint(uint64(newId), 10), token).Err() + if err == nil { + id = uint32(newId) + } + return err + }, m.krbTokenKey()) + return id, errno(err) +} + +func (m *redisMeta) doUpdateToken(ctx Context, id uint32, token []byte) syscall.Errno { + return errno(m.txn(ctx, func(tx *redis.Tx) error { + exist, err := tx.HExists(ctx, m.krbTokenKey(), strconv.FormatUint(uint64(id), 10)).Result() + if err != nil { + return err + } + if !exist { + return syscall.ENOENT + } + return tx.HSet(ctx, m.krbTokenKey(), strconv.FormatUint(uint64(id), 10), token).Err() + }, m.krbTokenKey())) +} + +func (m *redisMeta) doLoadToken(ctx Context, id uint32) (token []byte, st syscall.Errno) { + err := m.txn(ctx, func(tx *redis.Tx) error { + val, err := tx.HGet(ctx, m.krbTokenKey(), strconv.FormatUint(uint64(id), 10)).Bytes() + if err != nil { + return err + } + if val == nil { + return syscall.ENOENT + } + token = val + return nil + }, m.krbTokenKey()) + return token, errno(err) +} + +func (m *redisMeta) doDeleteTokens(ctx Context, ids []uint32) syscall.Errno { + return errno(m.txn(ctx, func(tx *redis.Tx) error { + strIds := make([]string, len(ids)) + for i, id := range ids { + strIds[i] = strconv.FormatUint(uint64(id), 10) + } + return tx.HDel(ctx, m.krbTokenKey(), strIds...).Err() + }, m.krbTokenKey())) +} + +func (m *redisMeta) doListTokens(ctx Context) (tokens map[uint32][]byte, st syscall.Errno) { + tokens = make(map[uint32][]byte) + err := m.txn(ctx, func(tx *redis.Tx) error { + vals, err := tx.HGetAll(ctx, m.krbTokenKey()).Result() + if err != nil { + return err + } + for k, v := range vals { + id, err := strconv.ParseUint(k, 10, 32) + if err != nil { + logger.Errorf("parse token id: %s: %v", k, err) + continue + } + tokens[uint32(id)] = []byte(v) + } + return nil + }, m.krbTokenKey()) + return tokens, errno(err) +} + func (m *redisMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler { return &redisDirHandler{ en: m, diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 6d7174a900ed..48d139806996 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -152,6 +152,11 @@ func (a *acl) toRule() *aclAPI.Rule { return r } +type DelegationToken struct { + Id uint32 `xorm:"pk autoincr"` + Token []byte +} + type namedNode struct { node `xorm:"extends"` Name []byte `xorm:"varbinary(255)"` @@ -1713,7 +1718,7 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode } else { n.Mode = mode & ^cumask } - if (pn.Flags&FlagSkipTrash) != 0 { + if (pn.Flags & FlagSkipTrash) != 0 { n.Flags |= FlagSkipTrash } @@ -1852,7 +1857,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip if (n.Flags&FlagAppend) != 0 || (n.Flags&FlagImmutable) != 0 { return syscall.EPERM } - if (n.Flags&FlagSkipTrash) != 0 { + if (n.Flags & FlagSkipTrash) != 0 { trash = 0 } if trash > 0 && n.Nlink > 1 { @@ -2023,7 +2028,7 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, attr if exist { return syscall.ENOTEMPTY } - if (n.Flags&FlagSkipTrash) != 0 { + if (n.Flags & FlagSkipTrash) != 0 { trash = 0 } now := time.Now().UnixNano() @@ -2234,7 +2239,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst if (dn.Flags&FlagAppend) != 0 || (dn.Flags&FlagImmutable) != 0 { return syscall.EPERM } - if (dn.Flags&FlagSkipTrash) != 0 { + if (dn.Flags & FlagSkipTrash) != 0 { trash = 0 } dn.setCtime(now) @@ -5019,6 +5024,74 @@ func (m *dbMeta) loadDumpedACLs(ctx Context) error { }) } +func (m *dbMeta) doStoreToken(ctx Context, token []byte) (id uint32, st syscall.Errno) { + err := m.txn(func(s *xorm.Session) error { + t := &DelegationToken{Token: token} + _, err := s.Insert(t) + if err != nil { + return err + } + id = t.Id + return nil + }) + return id, errno(err) +} + +func (m *dbMeta) doUpdateToken(ctx Context, id uint32, token []byte) syscall.Errno { + return errno(m.txn(func(s *xorm.Session) error { + t := &DelegationToken{Id: id} + ok, err := s.Get(t) + if err != nil { + return err + } + if !ok { + return syscall.ENOENT + } + t.Token = token + _, err = s.Cols("token").Update(t, &DelegationToken{Id: id}) + return err + })) +} + +func (m *dbMeta) doLoadToken(ctx Context, id uint32) (token []byte, st syscall.Errno) { + err := m.roTxn(ctx, func(s *xorm.Session) error { + t := &DelegationToken{Id: id} + ok, err := s.Get(t) + if err != nil { + return err + } + if !ok { + return syscall.ENOENT + } + token = t.Token + return nil + }) + return token, errno(err) +} + +func (m *dbMeta) doDeleteTokens(ctx Context, ids []uint32) syscall.Errno { + return errno(m.txn(func(s *xorm.Session) error { + _, err := s.In("id", ids).Delete(&DelegationToken{}) + return err + })) +} + +func (m *dbMeta) doListTokens(ctx Context) (tokens map[uint32][]byte, st syscall.Errno) { + err := m.roTxn(ctx, func(s *xorm.Session) error { + var ts []DelegationToken + err := s.Find(&ts) + if err != nil { + return err + } + tokens = make(map[uint32][]byte, len(ts)) + for _, t := range ts { + tokens[t.Id] = t.Token + } + return nil + }) + return tokens, errno(err) +} + type dbDirHandler struct { dirHandler } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 0ef4e71d4667..d57425dafebf 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -197,6 +197,7 @@ All keys: Niiiiiiii detached inde QDiiiiiiii directory quota Raaaa POSIX acl + KDaaaa delegation token */ func (m *kvMeta) inodeKey(inode Ino) []byte { @@ -263,6 +264,10 @@ func (m *kvMeta) aclKey(id uint32) []byte { return m.fmtKey("R", id) } +func (m *kvMeta) krbTokenKey(id uint32) []byte { + return m.fmtKey("KD", id) +} + func (m *kvMeta) parseACLId(key string) uint32 { // trim "R" rb := utils.ReadBuffer([]byte(key[1:])) @@ -1140,7 +1145,7 @@ func (m *kvMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode if (pattr.Flags & FlagImmutable) != 0 { return syscall.EPERM } - if (pattr.Flags&FlagSkipTrash) != 0 { + if (pattr.Flags & FlagSkipTrash) != 0 { attr.Flags |= FlagSkipTrash } @@ -1322,7 +1327,7 @@ func (m *kvMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip if (attr.Flags&FlagAppend) != 0 || (attr.Flags&FlagImmutable) != 0 { return syscall.EPERM } - if (attr.Flags&FlagSkipTrash) != 0 { + if (attr.Flags & FlagSkipTrash) != 0 { trash = 0 } if trash > 0 && attr.Nlink > 1 && rs[2] != nil { @@ -1455,7 +1460,7 @@ func (m *kvMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, oldA if ctx.Uid() != 0 && pattr.Mode&01000 != 0 && ctx.Uid() != pattr.Uid && ctx.Uid() != attr.Uid { return syscall.EACCES } - if (attr.Flags&FlagSkipTrash) != 0 { + if (attr.Flags & FlagSkipTrash) != 0 { trash = 0 } if trash > 0 { @@ -1600,7 +1605,7 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst if (tattr.Flags&FlagAppend) != 0 || (tattr.Flags&FlagImmutable) != 0 { return syscall.EPERM } - if (tattr.Flags&FlagSkipTrash) != 0 { + if (tattr.Flags & FlagSkipTrash) != 0 { trash = 0 } tattr.Ctime = now.Unix() @@ -3893,6 +3898,60 @@ func (m *kvMeta) loadDumpedACLs(ctx Context) error { }) } +func (m *kvMeta) doStoreToken(ctx Context, token []byte) (id uint32, st syscall.Errno) { + err := m.txn(ctx, func(tx *kvTxn) error { + newId, err := m.incrCounter(krbTokenCounter, 1) + if err != nil { + return err + } + tx.set(m.krbTokenKey(uint32(newId)), token) + id = uint32(newId) + return nil + }) + return id, errno(err) +} + +func (m *kvMeta) doUpdateToken(ctx Context, id uint32, token []byte) syscall.Errno { + return errno(m.txn(ctx, func(tx *kvTxn) error { + if tx.get(m.krbTokenKey(id)) == nil { + return syscall.ENOENT + } + tx.set(m.krbTokenKey(id), token) + return nil + })) +} + +func (m *kvMeta) doLoadToken(ctx Context, id uint32) (token []byte, st syscall.Errno) { + err := m.txn(ctx, func(tx *kvTxn) error { + token = tx.get(m.krbTokenKey(id)) + if token == nil { + return syscall.ENOENT + } + return nil + }) + return token, errno(err) +} + +func (m *kvMeta) doDeleteTokens(ctx Context, ids []uint32) syscall.Errno { + return errno(m.txn(ctx, func(tx *kvTxn) error { + for _, id := range ids { + tx.delete(m.krbTokenKey(id)) + } + return nil + })) +} + +func (m *kvMeta) doListTokens(ctx Context) (tokens map[uint32][]byte, st syscall.Errno) { + tokens = make(map[uint32][]byte) + err := m.client.scan(m.fmtKey("KD"), func(k, v []byte) bool { + rb := utils.FromBuffer(k[2:]) + id := rb.Get32() + tokens[id] = v + return true + }) + return tokens, errno(err) +} + type kvDirHandler struct { dirHandler } diff --git a/pkg/meta/utils.go b/pkg/meta/utils.go index c98b6aaf2143..06dcccf5c285 100644 --- a/pkg/meta/utils.go +++ b/pkg/meta/utils.go @@ -36,10 +36,11 @@ import ( ) const ( - aclCounter = "aclMaxId" - usedSpace = "usedSpace" - totalInodes = "totalInodes" - legacySessions = "sessions" + aclCounter = "aclMaxId" + usedSpace = "usedSpace" + totalInodes = "totalInodes" + legacySessions = "sessions" + krbTokenCounter = "krbTokenMaxId" ) var counterNames = []string{usedSpace, totalInodes, "nextInode", "nextChunk", "nextSession", "nextTrash"} diff --git a/sdk/java/kerberos.sh b/sdk/java/kerberos.sh new file mode 100755 index 000000000000..7b7c6dcb5dce --- /dev/null +++ b/sdk/java/kerberos.sh @@ -0,0 +1,43 @@ +#!/bin/sh + +set -e + +KERBEROS_REALM="EXAMPLE.COM" +KERBEROS_PRINCIPLE="administrator" +KERBEROS_PASSWORD="password1234" + +sudo tee /etc/krb5.conf << EOF +[libdefaults] + default_realm = $KERBEROS_REALM + dns_lookup_realm = false + dns_lookup_kdc = false +[realms] + $KERBEROS_REALM = { + kdc = localhost + admin_server = localhost + } +[logging] + default = FILE:/var/log/krb5libs.log + kdc = FILE:/var/log/krb5kdc.log + admin_server = FILE:/var/log/kadmind.log +[domain_realm] + .localhost = $KERBEROS_REALM + localhost = $KERBEROS_REALM +EOF + +sudo mkdir /etc/krb5kdc +sudo printf '*/*@%s\t*' "$KERBEROS_REALM" | sudo tee /etc/krb5kdc/kadm5.acl + +sudo apt-get update +sudo apt-get install -y krb5-kdc krb5-admin-server + +printf "$KERBEROS_PASSWORD\n$KERBEROS_PASSWORD" | sudo kdb5_util -r "$KERBEROS_REALM" create -s -W +for p in client server tom jerry; do + sudo kadmin.local -q "addprinc -randkey $p/localhost@$KERBEROS_REALM" + sudo kadmin.local -q "xst -k /tmp/$p.keytab $p/localhost@$KERBEROS_REALM" + sudo chmod +rx /tmp/$p.keytab +done + +echo "Restarting krb services..." +sudo service krb5-kdc restart +sudo service krb5-admin-server restart \ No newline at end of file diff --git a/sdk/java/libjfs/kerberos.go b/sdk/java/libjfs/kerberos.go new file mode 100644 index 000000000000..53e0c7d2394f --- /dev/null +++ b/sdk/java/libjfs/kerberos.go @@ -0,0 +1,644 @@ +/* + * JuiceFS, Copyright 2025 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bufio" + "crypto/rand" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/jcmturner/gokrb5/v8/keytab" + "github.com/jcmturner/gokrb5/v8/messages" + "github.com/jcmturner/gokrb5/v8/service" + "github.com/juicedata/juicefs/pkg/fs" + "github.com/juicedata/juicefs/pkg/meta" + "github.com/juicedata/juicefs/pkg/utils" + "io" + "net" + "regexp" + "strconv" + "strings" + "sync" + "syscall" + "time" +) + +const ( + defaultLife = 3600 * 24 * 7 + defaultRenew = 3600 * 24 +) + +const ( + mechanismHadoop = "hadoop" + mechanismMIT = "mit" +) + +var ( + namePattern = regexp.MustCompile(`([^/@]+)(/([^/@]+))?(@([^/@]+))?`) + paramPattern = regexp.MustCompile(`[^$]*(\$\d)`) + ruleParser = regexp.MustCompile(`(\[(\d+):([^\]]+)\](\(([^\)]+)\))?(s/([^/]+)/([^/]*)/(g)?)?)/?(L)?`) + noSimplePattern = regexp.MustCompile(`[/@]`) +) + +var ( + cleanOnce sync.Once +) + +type kRule struct { + isDefault bool + comps int + format string + match *regexp.Regexp + fromPattern *regexp.Regexp + toPattern string + repeat bool + lower bool +} + +func (r *kRule) String() string { + if r.isDefault { + return "DEFAULT" + } + s := fmt.Sprintf("RULE:[%d:%s]", r.comps, r.format) + if r.match != nil { + s += fmt.Sprintf("(%s)", r.match) + } + if r.fromPattern != nil { + s += fmt.Sprintf("s/%s/%s/", r.fromPattern, r.toPattern) + if r.repeat { + s += "g" + } + } + if r.lower { + s += "/L" + } + return s +} + +func (r *kRule) replaceParameters(params []string) string { + return paramPattern.ReplaceAllStringFunc(r.format, func(s string) string { + m := paramPattern.FindStringSubmatchIndex(s) + i, _ := strconv.Atoi(s[m[2]+1:]) + if i >= len(params) { + logger.Warnf("invalid param %s", s) + return s + } + return s[:m[2]] + params[i] + }) +} + +func (r *kRule) replaceSubs(base string) string { + if r.fromPattern == nil { + return base + } + if r.repeat { + return r.fromPattern.ReplaceAllString(base, r.toPattern) + } + m := r.fromPattern.FindStringIndex(base) + if m != nil { + return base[:m[0]] + r.toPattern + base[m[1]:] + } + return base +} + +func (r *kRule) apply(param []string, mechanism string, realm string) string { + var result string + if r.isDefault { + if realm == "" || param[0] == realm { + result = param[1] + } + } else if r.comps+1 == len(param) { + base := r.replaceParameters(param) + if r.match == nil || r.match.MatchString(base) { + result = r.replaceSubs(base) + } + } + if mechanism == mechanismHadoop && noSimplePattern.FindString(result) != "" { + return "" + } + if r.lower { + result = strings.ToLower(result) + } + return result +} + +func parseRule(rule string) *kRule { + rule = strings.TrimSpace(rule) + if rule == "DEFAULT" { + return &kRule{isDefault: true} + } + var r kRule + m := ruleParser.FindStringSubmatch(rule) + if m == nil { + return nil + } + r.comps, _ = strconv.Atoi(m[2]) + r.format = m[3] + var err error + r.match, err = regexp.Compile(m[5]) + if err != nil { + logger.Warnf("compile %s: %s", m[5], err) + return nil + } + r.fromPattern, err = regexp.Compile(m[7]) + if err != nil { + logger.Warnf("compile %s: %s", m[7], err) + return nil + } + r.toPattern = m[8] + r.repeat = m[9] == "g" + r.lower = m[10] == "L" + return &r +} + +type kerberosRules struct { + mechanism string + realm string + rules []*kRule +} + +func newkerberosRules(mechanism string, realm string, rules []string) *kerberosRules { + if mechanism == "" { + mechanism = mechanismHadoop + } + var rs []*kRule + for _, rule := range rules { + rs = append(rs, parseRule(rule)) + } + return &kerberosRules{mechanism, realm, rs} +} + +func (r *kerberosRules) getShortName(full string) string { + service, host, realm := parseFullName(full) + var param []string + if host == "" { + if realm == "" { + return service + } + param = []string{realm, service} + } else { + param = []string{realm, service, host} + } + if r.rules == nil { + r.rules = append(r.rules, &kRule{isDefault: true}) + } + for _, rule := range r.rules { + short := rule.apply(param, r.mechanism, r.realm) + if short != "" { + return short + } + } + if r.mechanism == mechanismHadoop { + return "" + } + return full +} + +func parseFullName(full string) (string, string, string) { + m := namePattern.FindStringSubmatch(full) + if m == nil || m[0] != full { + return "", "", "" + } + return m[1], m[3], m[5] +} + +type token struct { + User string + Renewer string + Password string + Issued int64 + Expire int64 +} + +type hostParam struct { + allAllowed bool + cidr []*net.IPNet + addrs map[string]bool +} +type proxyParam struct { + users []string + groups []string + hosts *hostParam +} + +type volParams struct { + keytab []byte + renew int64 + life int64 + superuser string + supergroup string + rules *kerberosRules + proxies map[string]*proxyParam +} + +func (vol *volParams) parse(kind, key, value string) { + if vol.rules == nil { + vol.rules = newkerberosRules(mechanismHadoop, "", nil) + } + switch kind { + case "keytab": + kt, err := base64.StdEncoding.DecodeString(value) + if err != nil { + logger.Errorf("decode keytab failed: %s", err) + } else { + vol.keytab = kt + } + case "life": + period, err := strconv.ParseInt(value, 10, 64) + if err != nil { + logger.Errorf("can not parse %s as int: %s", value, err) + } else { + vol.life = period + } + case "renew": + period, err := strconv.ParseInt(value, 10, 64) + if err != nil { + logger.Errorf("can not parse %s as int: %s", value, err) + } else { + vol.renew = period + } + case "superuser": + vol.superuser = value + case "supergroup": + vol.supergroup = value + case "mechanism": + value = strings.ToLower(value) + if value != mechanismHadoop && value != mechanismMIT { + logger.Errorf("invalid mechanism: %s", value) + } else { + vol.rules.mechanism = value + } + case "realm": + vol.rules.realm = value + case "rule": + rule := parseRule(value) + if rule != nil { + vol.rules.rules = append(vol.rules.rules, rule) + } else { + logger.Errorf("invalid kerberos rule: %s", value) + } + default: + split := strings.Split(key, ".") + if len(split) < 4 || split[1] != "proxy" { + logger.Warnf("invalid key: %s", key) + return + } + user := split[2] + proxy := vol.proxies[user] + if proxy == nil { + proxy = &proxyParam{hosts: &hostParam{}} + vol.proxies[user] = proxy + } + switch kind { + case "users": + proxy.users = strings.Split(value, ",") + for i := range proxy.users { + proxy.users[i] = strings.TrimSpace(proxy.users[i]) + } + case "groups": + proxy.groups = strings.Split(value, ",") + for i := range proxy.groups { + proxy.groups[i] = strings.TrimSpace(proxy.groups[i]) + } + case "hosts": + m := proxy.hosts + if strings.Contains(value, "*") { + m.allAllowed = true + } else { + m.addrs = make(map[string]bool) + for _, v := range strings.Split(value, ",") { + if strings.Contains(v, "/") { + // ip range + _, ipnet, err := net.ParseCIDR(v) + if err != nil { + logger.Errorf("wrong ip range %s: %s", v, err) + continue + } + m.cidr = append(m.cidr, ipnet) + } else { + m.addrs[v] = true + } + } + } + default: + logger.Errorf("invalid key: %s", key) + } + } +} + +func (vol *volParams) canProxy(realUser, user, group, ips, hostname string) bool { + if realUser == "" || realUser == user { + return true + } + if !vol.isUserGroupAllowed(realUser, user, group) { + logger.Errorf("user: %s is not allowed to impersonate %s", realUser, user) + return false + } + if !vol.isHostAllowed(realUser, ips, hostname) { + logger.Errorf("user: %s is not allowed to impersonate %s on %s", realUser, user, hostname) + return false + } + return true +} + +func (vol *volParams) isUserGroupAllowed(realUser, user, groups string) bool { + proxy := vol.proxies[realUser] + if proxy == nil { + return false + } + for _, u := range proxy.users { + if u == "*" || u == user { + return true + } + } + for _, group := range strings.Split(groups, ",") { + for _, ag := range proxy.groups { + if ag == "*" || ag == group { + return true + } + } + } + return false +} + +func (vol *volParams) isHostAllowed(realUser, ips, hostname string) bool { + proxy := vol.proxies[realUser] + if proxy == nil { + return false + } + m := proxy.hosts + if m.allAllowed { + return true + } + if m.addrs[hostname] { + return true + } + for _, ip := range strings.Split(ips, ",") { + if m.addrs[ip] { + return true + } + for _, ipNet := range m.cidr { + if net.ParseIP(ip) != nil && ipNet.Contains(net.ParseIP(ip)) { + return true + } + } + } + return false +} + +type kerberos struct { + vols map[string]*volParams + mu sync.Mutex + isRenewer bool +} + +func (k *kerberos) getSuperUser(volname string) (string, string) { + vol := k.vols[volname] + if vol != nil { + return vol.superuser, vol.supergroup + } + return "", "" +} + +func (k *kerberos) auth(volname, user, realUser, group, ips, hostname string, reqBytes []byte) syscall.Errno { + var req = messages.APReq{} + err := req.Unmarshal(reqBytes) + if err != nil { + logger.Errorf("invalid AP_REQ: %s", err) + return syscall.EINVAL + } + vol := k.vols[volname] + if vol == nil || vol.keytab == nil { + logger.Errorf("server keytab for %s not setted", volname) + return syscall.ENODATA + } + kt := new(keytab.Keytab) + err = kt.Unmarshal(vol.keytab) + if err != nil { + logger.Errorf("unmarshal keytab: %s", err) + return syscall.EINVAL + } + s := service.NewSettings(kt, service.DecodePAC(false)) + ok, creds, err := service.VerifyAPREQ(&req, s) + if err != nil { + logger.Errorf("verify: %s", err) + return syscall.EINVAL + } else if !ok { + return syscall.EACCES + } + + principal := fmt.Sprintf("%s@%s", creds.UserName(), creds.Realm()) + authedUser := vol.rules.getShortName(principal) + if authedUser == "" { + logger.Warnf("no rule for principal %s", principal) + return syscall.EINVAL + } + + if realUser == "" { + if user == authedUser { + return 0 + } + } else { + if realUser == authedUser && vol.canProxy(realUser, user, group, ips, hostname) { + return 0 + } + } + logger.Warnf("auth failed, principal: %s, authedUser: %s, user: %s, realUser: %s", principal, authedUser, user, realUser) + return syscall.EACCES +} + +func (k *kerberos) issue(ctx meta.Context, m meta.Meta, volname, user, renewer string) (uint32, *token, syscall.Errno) { + vol := k.vols[volname] + if vol == nil { + return 0, nil, syscall.EINVAL + } + now := time.Now() + t := &token{ + User: user, + Renewer: renewer, + Issued: now.Unix(), + Expire: now.Unix() + vol.life, + } + passwd := make([]byte, 20) + _, _ = io.ReadFull(rand.Reader, passwd) + t.Password = hex.EncodeToString(passwd) + id, eno := k.storeToken(ctx, m, t) + if eno != 0 { + return 0, nil, eno + } + return id, t, 0 +} + +func (k *kerberos) check(ctx meta.Context, m meta.Meta, volname, user string, id uint32, password string) syscall.Errno { + t, eno := k.loadToken(ctx, m, id) + if eno != 0 { + return eno + } + now := time.Now().Unix() + if now > t.Expire { + logger.Warnf("token %d expired", id) + return syscall.EINVAL + } + if password != t.Password || user != t.User { + logger.Warnf("token %d invalid user or password", id) + return syscall.EACCES + } + return 0 +} + +func (k *kerberos) renew(ctx meta.Context, m meta.Meta, volname, renewer string, id uint32, password string) (int64, syscall.Errno) { + cleanOnce.Do(func() { + go func() { + ticker := time.NewTicker(10 * time.Minute) + for range ticker.C { + _ = k.cleanupTokens(ctx, m) + } + }() + }) + t, eno := k.loadToken(ctx, m, id) + if eno != 0 { + return 0, eno + } + if password != t.Password || renewer != t.Renewer { + return 0, syscall.EACCES + } + renew := int64(defaultRenew) + vol := k.vols[volname] + if vol != nil && vol.renew != 0 { + renew = vol.renew + } + t.Expire += renew + eno = k.updateToken(ctx, m, id, t) + if eno != 0 { + return 0, eno + } + return t.Expire, 0 +} + +func (k *kerberos) storeToken(ctx meta.Context, m meta.Meta, t *token) (id uint32, st syscall.Errno) { + marshal, err := json.Marshal(t) + if err != nil { + logger.Errorf("marshal token: %s", err) + return 0, syscall.EINVAL + } + return m.StoreToken(ctx, marshal) +} + +func (k *kerberos) updateToken(ctx meta.Context, m meta.Meta, id uint32, t *token) syscall.Errno { + marshal, err := json.Marshal(t) + if err != nil { + logger.Errorf("marshal token: %s", err) + return syscall.EINVAL + } + return m.UpdateToken(ctx, id, marshal) +} + +func (k *kerberos) loadToken(ctx meta.Context, m meta.Meta, id uint32) (*token, syscall.Errno) { + tb, errno := m.LoadToken(ctx, id) + if errno != 0 { + return nil, errno + } + t := &token{} + err := json.Unmarshal(tb, t) + if err != nil { + logger.Errorf("unmarshal token %d: %s", id, err) + return nil, syscall.EINVAL + } + return t, 0 +} + +func (k *kerberos) cancelToken(ctx meta.Context, m meta.Meta, volname, user string, id uint32, password string) syscall.Errno { + t, eno := k.loadToken(ctx, m, id) + if eno != 0 { + return eno + } + if password != t.Password || user != t.Renewer && user != t.User { + return syscall.EACCES + } + return m.DeleteTokens(ctx, []uint32{id}) +} + +func (k *kerberos) cleanupTokens(ctx meta.Context, m meta.Meta) syscall.Errno { + tokens, eno := m.ListTokens(ctx) + if eno != 0 { + return eno + } + var todelete []uint32 + now := time.Now().Unix() + for id, data := range tokens { + r := utils.FromBuffer(data) + r.Seek(0) + _ = int64(r.Get64()) + expire := int64(r.Get64()) + if expire <= now { + todelete = append(todelete, id) + } + } + return m.DeleteTokens(ctx, todelete) +} + +func (k *kerberos) loadConf(name, content string, jfs *fs.FileSystem) { + vol := &volParams{ + life: defaultLife, + renew: defaultRenew, + proxies: make(map[string]*proxyParam), + } + scanner := bufio.NewScanner(strings.NewReader(content)) + for scanner.Scan() { + line := scanner.Text() + idx := strings.Index(line, "#") + if idx >= 0 { + line = line[:idx] + } + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + fields := strings.SplitN(line, "=", 2) + if len(fields) != 2 { + logger.Warningf("bad line: %s", line) + continue + } + key := strings.TrimSpace(fields[0]) + value := strings.TrimSpace(fields[1]) + split := strings.Split(key, ".") + if len(split) < 2 { + logger.Warningf("bad line: %s", line) + continue + } + keySuffix := split[len(split)-1] + volName := split[0] + if volName != name { + continue + } + vol.parse(keySuffix, key, value) + } + jfs.Superuser = vol.superuser + jfs.Supergroup = vol.supergroup + k.mu.Lock() + k.vols[name] = vol + k.mu.Unlock() +} + +func (k *kerberos) init() int { + k.vols = make(map[string]*volParams) + return 0 +} + +var kerb = kerberos{} diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index a76407e6eecf..3867331f5d8c 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -44,9 +44,12 @@ typedef struct { import "C" import ( "bytes" + "context" "encoding/json" + "errors" "fmt" "io" + "net" "net/http" _ "net/http/pprof" "os" @@ -99,6 +102,9 @@ var ( formats = make(map[string]*meta.Format) + kerbOnce = sync.Once{} + superuserChangedCb = make(map[string]struct{}) + MaxDeletes = meta.RmrDefaultThreads caller = CALLER_JAVA ) @@ -181,6 +187,7 @@ func errno(err error) int32 { type wrapper struct { *fs.FileSystem + volname string ctx meta.Context m *mapping user string @@ -236,8 +243,15 @@ func (w *wrapper) withPid(pid int64) meta.Context { return ctx } +func (w *wrapper) getSuperUser() string { + if w.Superuser != "" { + return w.Superuser + } + return w.superuser +} + func (w *wrapper) isSuperuser(name string, groups []string) bool { - if name == w.superuser || w.conf.SuperFS { + if name == w.getSuperUser() || w.conf.SuperFS { return true } for _, g := range groups { @@ -262,9 +276,9 @@ func (w *wrapper) lookupGid(group string) uint32 { return uint32(w.m.lookupGroup(group)) } -func (w *wrapper) lookupGids(groups string) []uint32 { +func (w *wrapper) lookupGids(groups []string) []uint32 { var gids []uint32 - for _, g := range strings.Split(groups, ",") { + for _, g := range groups { gids = append(gids, w.lookupGid(g)) } return gids @@ -363,6 +377,9 @@ type javaConf struct { Caller int `json:"caller"` Subdir string `json:"subdir"` + AuthMethod string `json:"authMethod,omitempty"` + RealUser string `json:"realUser,omitempty"` + SuperFS bool `json:"superFs,omitempty"` } @@ -391,27 +408,38 @@ func getOrCreate(name, user, group, superuser, supergroup string, conf javaConf, } logger.Infof("JuiceFileSystem created for user:%s group:%s", user, group) } - w := &wrapper{jfs, nil, m, user, superuser, supergroup, conf} - var gs []string - if userGroupCache[name] != nil { - gs = userGroupCache[name][user] - } - if gs == nil { - gs = strings.Split(group, ",") - } - group = strings.Join(gs, ",") - logger.Debugf("update groups of %s to %s", user, group) - if w.isSuperuser(user, gs) { - w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0}) - } else { - w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(user), w.lookupGids(group)) + w := &wrapper{jfs, name, nil, m, user, superuser, supergroup, conf} + if _, ok := superuserChangedCb[name]; !ok { + jfs.Meta().OnReload(func(format *meta.Format) { + kerb.loadConf(name, format.KerbConf, jfs) + updateAllCtx(name, user, group) + }) + superuserChangedCb[name] = struct{}{} } activefs[name] = append(ws, w) + updateAllCtx(name, user, group) nextFsHandle = nextFsHandle + 1 handlers[nextFsHandle] = w return nextFsHandle } +func updateAllCtx(name string, user, groups string) { + ws := activefs[name] + if len(ws) > 0 { + for _, w := range ws { + var gs []string + if userGroupCache[name] != nil { + gs = userGroupCache[name][user] + } + if gs == nil { + gs = strings.Split(groups, ",") + } + logger.Debugf("update groups of %s to %s", user, strings.Join(gs, ",")) + updateCtx(w, gs) + } + } +} + func push2Gateway(pushGatewayAddr, pushAuth string, pushInterVal time.Duration, registry *prometheus.Registry, commonLabels map[string]string) { pusher := push.New(pushGatewayAddr, "juicefs").Gatherer(registry) for k, v := range commonLabels { @@ -498,8 +526,9 @@ func push2Graphite(graphite string, pushInterVal time.Duration, registry *promet } //export jfs_init -func jfs_init(cname, cjsonConf, user, group, superuser, supergroup *C.char) int64 { +func jfs_init(credentialPtr uintptr, count int32, cname, cjsonConf, cuser, group, superuser, supergroup *C.char) int64 { name := C.GoString(cname) + user := C.GoString(cuser) debug.SetGCPercent(50) object.UserAgent = "JuiceFS-SDK " + version.Version() var jConf javaConf @@ -511,7 +540,7 @@ func jfs_init(cname, cjsonConf, user, group, superuser, supergroup *C.char) int6 logger.Fatalf("invalid json") } } - return getOrCreate(name, C.GoString(user), C.GoString(group), C.GoString(superuser), C.GoString(supergroup), jConf, func() *fs.FileSystem { + return getOrCreate(name, user, C.GoString(group), C.GoString(superuser), C.GoString(supergroup), jConf, func() *fs.FileSystem { if jConf.Debug || os.Getenv("JUICEFS_DEBUG") != "" { utils.SetLogLevel(logrus.DebugLevel) go func() { @@ -698,6 +727,37 @@ func jfs_init(cname, cjsonConf, user, group, superuser, supergroup *C.char) int6 return nil } jfs.InitMetrics(registerer) + if format.KerbConf != "" { + kerbOnce.Do(func() { + kerb.init() + }) + kerb.loadConf(name, format.KerbConf, jfs) + var credential []byte + if credentialPtr == 0 { + logger.Errorf("kerberos credential is needed") + return nil + } + credential = toBuf(credentialPtr, count) + hostname, _ := os.Hostname() + ip := resolve(hostname) + if ip == "" { + ip, _ = findLocalIP("", "") + logger.Infof("use local ip %s for %s", ip, hostname) + } + var eno syscall.Errno + if jConf.AuthMethod == "kerberos" { + eno = kerb.auth(name, user, jConf.RealUser, C.GoString(group), ip, hostname, credential) + } else { + tbuf := utils.FromBuffer(credential) + id := tbuf.Get32() + password := tbuf.Get(int(tbuf.Get8())) + eno = kerb.check(meta.Background(), jfs.Meta(), name, user, id, string(password)) + } + if eno != 0 { + logger.Errorf("%s auth failed for vol:%s(%s:%s): %s", jConf.AuthMethod, name, user, jConf.RealUser, eno) + return nil + } + } return jfs }) } @@ -763,15 +823,19 @@ func jfs_update_uid_grouping(cname, uidstr *C.char, grouping *C.char) { for _, w := range ws { w.m.update(uids, gids, false) logger.Debugf("Update groups of %s to %s", w.user, strings.Join(userGroups[w.user], ",")) - if w.isSuperuser(w.user, userGroups[w.user]) { - w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0}) - } else { - w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(w.user), w.lookupGids(strings.Join(userGroups[w.user], ","))) - } + updateCtx(w, userGroups[w.user]) } } } +func updateCtx(w *wrapper, groups []string) { + if w.isSuperuser(w.user, groups) { + w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0}) + } else { + w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(w.user), w.lookupGids(groups)) + } +} + //export jfs_getGroups func jfs_getGroups(cname, cuser *C.char, buf uintptr, count int32) int32 { name := C.GoString(cname) @@ -1922,5 +1986,111 @@ func jfs_warmup(pid int64, h int64, _paths *C.char, numthreads int32, background return int32(copy(buf, res)) } +func resolve(hostname string) string { + if hostname == "" { + return "" + } + start := time.Now() + ips, err := net.DefaultResolver.LookupIP(context.Background(), "ip4", hostname) + if err != nil { + logger.Warningf("Fail to resolve host %s: %s", hostname, err) + return "" + } + var ipStr []string + for _, ip := range ips { + ipStr = append(ipStr, ip.To4().String()) + } + logger.Debugf("resolve %s to %s in %s", hostname, strings.Join(ipStr, ","), time.Since(start)) + return strings.Join(ipStr, ",") +} + +func findLocalIP(mask string, iname string) (string, error) { + for strings.HasSuffix(mask, ".0") { + mask = mask[:len(mask)-2] + } + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 && iname == "" && mask == "" { + continue // interface down + } + if iface.Flags&net.FlagLoopback != 0 { + continue // loopback interface + } + if iname != "" && iface.Name != iname && !strings.HasPrefix(iface.Name, iname+".") { + continue + } + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + continue // not an ipv4 address + } + if !strings.HasPrefix(ip.String(), mask) { + continue + } + return ip.String(), nil + } + } + return "", errors.New("are you connected to the network?") +} + +//export jfs_get_token +func jfs_get_token(h int64, cname *C.char, buf uintptr, count int32, renewer *C.char) int32 { + w := F(h) + if w == nil { + return EINVAL + } + id, t, eno := kerb.issue(w.ctx, w.Meta(), C.GoString(cname), w.user, C.GoString(renewer)) + if eno != 0 { + logger.Errorf("get token for %s: %s", w.volname, eno) + return errno(eno) + } + wb := utils.NewNativeBuffer(toBuf(buf, count)) + wb.Put32(id) + wb.Put64(uint64(t.Issued)) + wb.Put64(uint64(t.Expire)) + wb.Put([]byte(t.Password)) + return int32(wb.Offset()) +} + +//export jfs_renew_token +func jfs_renew_token(h int64, id uint32, password *C.char) int64 { + w := F(h) + if w == nil { + return EINVAL + } + expire, eno := kerb.renew(w.ctx, w.Meta(), w.volname, w.user, id, C.GoString(password)) + if eno != 0 { + logger.Errorf("renew token %d for %s: %s", id, w.volname, eno) + return int64(errno(eno)) + } + return expire +} + +//export jfs_cancel_token +func jfs_cancel_token(h int64, id uint32, password *C.char) int32 { + w := F(h) + if w == nil { + return EINVAL + } + return errno(kerb.cancelToken(w.ctx, w.Meta(), w.volname, w.user, id, C.GoString(password))) +} + func main() { } diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java index c96c152348d0..40f8e0d48f19 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,4 +149,8 @@ public FileChecksum getFileChecksum(Path f) throws IOException { patchDistCpChecksum(); return super.getFileChecksum(f); } + + public Token getDelegationToken(String renewer) throws IOException { + return fs.getDelegationToken(renewer); + } } diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index e2fe2c823297..2628db0f6bef 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -18,6 +18,9 @@ import com.google.common.collect.Lists; import com.kenai.jffi.internal.StubLoader; import io.juicefs.exception.QuotaExceededException; +import io.juicefs.kerberos.AuthCredential; +import io.juicefs.kerberos.JuiceFSDelegationTokenIdentifier; +import io.juicefs.kerberos.KerberosUtil; import io.juicefs.metrics.JuiceFSInstrumentation; import io.juicefs.permission.RangerConfig; import io.juicefs.permission.RangerPermissionChecker; @@ -38,8 +41,14 @@ import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.HadoopKerberosName; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DirectBufferPool; import org.apache.hadoop.util.Progressable; @@ -56,6 +65,7 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; @@ -63,11 +73,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.jar.JarFile; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import java.util.zip.ZipEntry; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /**************************************************************** * Implement the FileSystem API for JuiceFS @@ -105,6 +115,7 @@ static String loadVersion() { private JuiceFileSystemImpl superGroupFileSystem; private RangerPermissionChecker rangerPermissionChecker; + private boolean dtEnabled; // whether delegation token was enabled private static Libjfs lib = loadLibrary(); private long handle; @@ -143,7 +154,7 @@ static String loadVersion() { private static Libjfs.LogCallBack callBack; public static interface Libjfs { - long jfs_init(String name, String jsonConf, String user, String group, String superuser, String supergroup); + long jfs_init(Pointer credential, int size, String name, String jsonConf, String user, String group, String superuser, String supergroup); void jfs_update_uid_grouping(String name, String uidstr, String grouping); @@ -213,6 +224,12 @@ public static interface Libjfs { void jfs_set_callback(LogCallBack callBack); + int jfs_get_token(long h, String name, Pointer buf, int bufSize, String renewer); + + long jfs_renew_token(long h, int id, String password); + + int jfs_cancel_token(long pid, long h, int id, String password); + interface LogCallBack { @Delegate void call(String msg); @@ -397,6 +414,26 @@ public void initialize(URI uri, Configuration conf) throws IOException { } JSONObject obj = new JSONObject(); + String spn = SecurityUtil.getServerPrincipal(getConf(conf, "server-principal", ""), name); + if (spn.contains("@")) { + spn = spn.split("@")[0]; + } + AuthCredential authCredential = buildAuthCredential(spn); + Pointer credential = null; + int crdSize = 0; + if (authCredential != null) { + crdSize = authCredential.getCredential().length; + credential = Memory.allocate(Runtime.getRuntime(lib), crdSize); + credential.put(0, authCredential.getCredential(), 0, crdSize); + } + + if (authCredential != null) { + obj.put("authMethod", authCredential.getMethod()); + } + if (ugi.getRealUser() != null) { + obj.put("realUser", ugi.getRealUser().getShortUserName()); + } + String[] keys = new String[]{"meta",}; for (String key : keys) { obj.put(key, getConf(conf, key, "")); @@ -462,7 +499,7 @@ public void initialize(URI uri, Configuration conf) throws IOException { obj.put("superFs", asSuperFs); obj.put("subdir", subdir); String jsonConf = obj.toString(2); - handle = lib.jfs_init(name, jsonConf, user, groupStr, superuser, supergroup); + handle = lib.jfs_init(credential, crdSize, name, jsonConf, user, groupStr, superuser, supergroup); if (handle <= 0) { throw new IOException("JuiceFS initialized failed for jfs://" + name); } @@ -1856,11 +1893,6 @@ public boolean supportsSymlinks() { return false; } - @Override - public String getCanonicalServiceName() { - return null; // Does not support Token - } - @Override public FsStatus getStatus(Path p) throws IOException { if (needCheckPermission() && !checkParentPathAccess(p, FsAction.EXECUTE, "getStatus")) { @@ -2287,4 +2319,114 @@ public AclStatus getAclStatus(Path path) throws IOException { } return builder.build(); } + + public AuthCredential buildAuthCredential(String spn) throws IOException { + // auth use kerberos + if (UserGroupInformation.getLoginUser().hasKerberosCredentials()) { + dtEnabled = true; + byte[] cred; + try { + cred = KerberosUtil.genApReq(spn); + } catch (InterruptedException e) { + throw new IOException("generate kerberos AP-REQ failed", e); + } + return new AuthCredential("kerberos", cred); + } + + // auth use delegation token + for (Token token : ugi.getCredentials().getAllTokens()) { + if (token.getKind().equals(JuiceFSDelegationTokenIdentifier.TOKEN_KIND) && + buildServiceName().equals(token.getService().toString())) { + dtEnabled = true; + + AbstractDelegationTokenIdentifier identifier = (AbstractDelegationTokenIdentifier) token.decodeIdentifier(); + int id = identifier.getMasterKeyId(); + byte[] password = token.getPassword(); + ByteBuffer buf = ByteBuffer.allocate(5 + password.length); + buf.putInt(id); + buf.putInt(password.length); + buf.put(password); + + return new AuthCredential("token", buf.array()); + } + } + + return null; + } + + private String buildServiceName() { + return getScheme() + "://" + (name == null ? "/" : name); + } + + @Override + public String getCanonicalServiceName() { + return dtEnabled ? buildServiceName() : null; + } + + @Override + public Token getDelegationToken(String renewer) throws IOException { + if (!dtEnabled) { + return null; + } + String owner = ugi.getShortUserName(); + String realUser = ugi.getRealUser() != null ? ugi.getRealUser().getShortUserName() : null; + int tokenSize = 0, r = 8<<10; + Pointer tokenBuf = null; + while (r > tokenSize) { + tokenSize = r; + tokenBuf = Memory.allocate(Runtime.getRuntime(lib), tokenSize); + r = lib.jfs_get_token(handle, name, tokenBuf, tokenSize, (new HadoopKerberosName(renewer)).getShortName()); + } + if (r < 0) { + throw new IOException(String.format("get delegation token failed, return code %d", r)); + } + int id = tokenBuf.getInt(0); + long issueDate = tokenBuf.getLongLong(4); + long maxDate = tokenBuf.getLongLong(12); + int pwdLen = r - 20; + byte[] pwd = new byte[pwdLen]; + tokenBuf.get(20, pwd, 0, pwdLen); + + JuiceFSDelegationTokenIdentifier identifier = + new JuiceFSDelegationTokenIdentifier( + owner, + renewer, + realUser); + identifier.setIssueDate(issueDate); + identifier.setMaxDate(maxDate); + identifier.setMasterKeyId(id); + + return new Token<>( + identifier.getBytes(), + pwd, + identifier.getKind(), + new Text(getCanonicalServiceName())); + } + + public long renewToken(Token token) throws IOException { + AbstractDelegationTokenIdentifier identifier = (AbstractDelegationTokenIdentifier) token.decodeIdentifier(); + int id = identifier.getMasterKeyId(); + String pwd = new String(token.getPassword(), StandardCharsets.UTF_8); + long r = lib.jfs_renew_token(handle, id, pwd); + if (r == EACCESS) { + throw new IOException("permission denied"); + } + if (r < 0) { + throw new IOException(String.format("renew token failed, return code %d", r)); + } + return r * 1000; + } + + public void cancelToken(Token token) throws IOException { + AbstractDelegationTokenIdentifier identifier = (AbstractDelegationTokenIdentifier) token.decodeIdentifier(); + int id = identifier.getMasterKeyId(); + String pwd = new String(token.getPassword(), StandardCharsets.UTF_8);; + int r = lib.jfs_cancel_token(Thread.currentThread().getId(), handle, id, pwd); + if (r == EACCESS) { + throw new IOException("permission denied"); + } + if (r < 0) { + throw new IOException(String.format("cancel token failed, return code %d", r)); + } + } } diff --git a/sdk/java/src/main/java/io/juicefs/kerberos/AuthCredential.java b/sdk/java/src/main/java/io/juicefs/kerberos/AuthCredential.java new file mode 100644 index 000000000000..f6488d3bb19d --- /dev/null +++ b/sdk/java/src/main/java/io/juicefs/kerberos/AuthCredential.java @@ -0,0 +1,42 @@ +/* + * JuiceFS, Copyright 2025 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.juicefs.kerberos; + +public class AuthCredential { + private String method; + private byte[] credential; + + public AuthCredential(String method, byte[] credential) { + this.method = method; + this.credential = credential; + } + + public String getMethod() { + return method; + } + + public void setMethod(String method) { + this.method = method; + } + + public byte[] getCredential() { + return credential; + } + + public void setCredential(byte[] credential) { + this.credential = credential; + } +} diff --git a/sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSDelegationTokenIdentifier.java b/sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSDelegationTokenIdentifier.java new file mode 100644 index 000000000000..d4b0fb4ab6c8 --- /dev/null +++ b/sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSDelegationTokenIdentifier.java @@ -0,0 +1,41 @@ +/* + * JuiceFS, Copyright 2025 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.juicefs.kerberos; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; + +public class JuiceFSDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { + public static final Text TOKEN_KIND = new Text("JUICEFS_DELEGATION_TOKEN"); + + public JuiceFSDelegationTokenIdentifier() { + } + + public JuiceFSDelegationTokenIdentifier(String owner, String renewer, String realUser) { + super(new Text(owner), new Text(renewer), realUser == null ? null : new Text(realUser)); + } + + @Override + public Text getKind() { + return TOKEN_KIND; + } + + @Override + public String toString() { + return "token for " + getUser().getShortUserName() + + ": " + super.toString(); + } +} diff --git a/sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSTokenRenewer.java b/sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSTokenRenewer.java new file mode 100644 index 000000000000..c11ea8c6f208 --- /dev/null +++ b/sdk/java/src/main/java/io/juicefs/kerberos/JuiceFSTokenRenewer.java @@ -0,0 +1,62 @@ +/* + * JuiceFS, Copyright 2025 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.juicefs.kerberos; + +import io.juicefs.JuiceFileSystem; +import io.juicefs.JuiceFileSystemImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; + +import java.io.IOException; +import java.net.URI; + +public class JuiceFSTokenRenewer extends TokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return JuiceFSDelegationTokenIdentifier.TOKEN_KIND.equals(kind); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @Override + public long renew(Token token, Configuration configuration) throws IOException, InterruptedException { + String service = token.getService().toString(); + FileSystem fs = FileSystem.get(URI.create(service), configuration); + if (fs instanceof JuiceFileSystem) { + return ((JuiceFileSystemImpl) ((FilterFileSystem) fs).getRawFileSystem()).renewToken(token); + } + throw new IOException("renew token failed"); + } + + @Override + public void cancel(Token token, Configuration configuration) throws IOException, InterruptedException { + String service = token.getService().toString(); + FileSystem fs = FileSystem.get(URI.create(service), configuration); + if (fs instanceof JuiceFileSystem) { + ((JuiceFileSystemImpl) ((FilterFileSystem) fs).getRawFileSystem()).cancelToken(token); + return; + } + throw new IOException("cancel token failed"); + } +} diff --git a/sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java b/sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java new file mode 100644 index 000000000000..ba31eb365703 --- /dev/null +++ b/sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java @@ -0,0 +1,41 @@ +package io.juicefs.kerberos; + +import org.apache.hadoop.security.UserGroupInformation; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import sun.security.jgss.GSSHeader; +import sun.security.util.DerValue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +public class KerberosUtil { + public static byte[] genApReq(String spn) throws IOException, InterruptedException { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + if (UserGroupInformation.isLoginKeytabBased()) { + loginUser.checkTGTAndReloginFromKeytab(); + } else if (UserGroupInformation.isLoginTicketBased()) { + loginUser.reloginFromTicketCache(); + } + return loginUser.doAs((PrivilegedExceptionAction) () -> { + GSSManager manager = GSSManager.getInstance(); + + GSSName serverName = manager.createName(spn, GSSName.NT_USER_NAME, org.apache.hadoop.security.authentication.util.KerberosUtil.GSS_KRB5_MECH_OID); + GSSContext context = manager.createContext(serverName, org.apache.hadoop.security.authentication.util.KerberosUtil.GSS_KRB5_MECH_OID, null, GSSContext.DEFAULT_LIFETIME); + + byte[] token = new byte[0]; + token = context.initSecContext(token, 0, token.length); + + ByteArrayInputStream in = new ByteArrayInputStream(token, 0, token.length); + new GSSHeader(in); + int tokenId = ((in.read() << 8) | in.read()); + if (tokenId != 0x0100) + throw new GSSException(GSSException.DEFECTIVE_TOKEN, -1, + "AP_REQ token id does not match!"); + return new DerValue(in).toByteArray(); + }); + } +} diff --git a/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier new file mode 100644 index 000000000000..8e941677553b --- /dev/null +++ b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier @@ -0,0 +1 @@ +io.juicefs.kerberos.JuiceFSDelegationTokenIdentifier diff --git a/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 000000000000..63b8b46df2ff --- /dev/null +++ b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1 @@ +io.juicefs.kerberos.JuiceFSTokenRenewer diff --git a/sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java b/sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java new file mode 100644 index 000000000000..4de84cab5954 --- /dev/null +++ b/sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java @@ -0,0 +1,208 @@ +package io.juicefs.kerberos; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.junit.Test; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class KerberosTest { + private static final String clientPrincipal = "client/localhost"; + private static final String clientKeytab = "/tmp/client.keytab"; + private static final String tomPrincipal = "tom/localhost"; + private static final String tomKeytab = "/tmp/tom.keytab"; + + private static final String jerryPrincipal = "jerry/localhost"; + private static final String jerryKeytab = "/tmp/jerry.keytab"; + private static final String serverPrincipal = "server/localhost"; + + + @Test + public void testWithoutKrb() throws Exception { + UserGroupInformation.reset(); + Configuration cfg = new Configuration(); + cfg.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + try (FileSystem fs = FileSystem.newInstance(cfg)) { + fail("should not success without kerberos login"); + } catch (IOException ignored) { + } + UserGroupInformation.reset(); + } + + @Test + public void test() throws Exception { + UserGroupInformation.reset(); + Configuration cfg = new Configuration(); + cfg.set("hadoop.security.authentication", "kerberos"); + cfg.set("juicefs.server-principal", serverPrincipal); + cfg.set("juicefs.mountpoint", "/jfs"); // to new another jfs + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab); + cfg.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + FileSystem fs = FileSystem.newInstance(cfg); + fs.listStatus(new Path("/")); + UserGroupInformation.reset(); + fs.close(); + } + + @Test + public void testToken() throws Exception { + UserGroupInformation.reset(); + Configuration cfg = new Configuration(); + cfg.set("hadoop.security.authentication", "kerberos"); + cfg.set("juicefs.server-principal", serverPrincipal); + cfg.set("juicefs.mountpoint", "/jfs"); // to new another jfs + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab); + cfg.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + FileSystem fs = FileSystem.newInstance(cfg); + long start = System.currentTimeMillis(); + Token t = fs.getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName()); + long end = System.currentTimeMillis(); + System.out.println("get token time: " + (end - start) + " ms"); + JuiceFSTokenRenewer renewer = new JuiceFSTokenRenewer(); + start = System.currentTimeMillis(); + System.out.println(renewer.renew(t, cfg)); + AbstractDelegationTokenIdentifier identifier = (AbstractDelegationTokenIdentifier) t.decodeIdentifier(); + System.out.println("token id: " + identifier.getMasterKeyId()); + end = System.currentTimeMillis(); + System.out.println("renew token time: " + (end - start) + " ms"); + fs.close(); + UserGroupInformation.reset(); + } + + @Test + public void testProxyUser() throws Exception { + UserGroupInformation.reset(); + Configuration cfg = new Configuration(); + cfg.set("hadoop.security.authentication", "kerberos"); + cfg.set("juicefs.server-principal", serverPrincipal); + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab); + cfg.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + UserGroupInformation realUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation foo = UserGroupInformation.createProxyUser("foo", realUser); + foo.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + cfg.set("juicefs.mountpoint", "/jfs1"); // to new another jfs + FileSystem fs = FileSystem.newInstance(cfg); + fs.close(); + return null; + } + }); + + UserGroupInformation bar = UserGroupInformation.createProxyUser("bar", realUser); + bar.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + try { + cfg.set("juicefs.mountpoint", "/jfs2"); // to new another jfs + FileSystem fs = FileSystem.newInstance(cfg); + fail("user bar should not proxyed"); + } catch (Exception ignored){ + } + return null; + } + }); + } + + @Test + public void testSuperUser() throws Exception { + UserGroupInformation.reset(); + Configuration cfg = new Configuration(); + cfg.set("hadoop.security.authentication", "kerberos"); + cfg.set("juicefs.server-principal", serverPrincipal); + cfg.set("juicefs.mountpoint", "/jfs3"); // to new another jfs + cfg.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(clientPrincipal, clientKeytab); + FileSystem fs = FileSystem.newInstance(cfg); + Path dir = new Path("/testsuperuser"); + fs.delete(dir); + fs.mkdirs(dir); + fs.setOwner(dir, "foo", "foo"); // only superuser has permission + } + + @Test + public void testMapRule() throws Exception { + UserGroupInformation.reset(); + Configuration cfg = new Configuration(); + cfg.set("hadoop.security.authentication", "kerberos"); + cfg.set("juicefs.server-principal", serverPrincipal); + cfg.set("juicefs.mountpoint", "/jfs4"); // to new another jfs + cfg.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + cfg.set("hadoop.security.auth_to_local", "RULE:[2:$1/$2@$0](jerry/.*@EXAMPLE\\.COM)s/.*/jerry_map/\nDEFAULT"); + + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(jerryPrincipal, jerryKeytab); + FileSystem fs = FileSystem.newInstance(cfg); + Path dir = new Path("/testAuthToLocal"); + fs.delete(dir); + fs.mkdirs(dir); + FileStatus[] statuses = fs.listStatus(new Path("/")); + assertEquals("jerry_map", fs.getFileStatus(dir).getOwner()); + fs.close(); + } + + @Test + public void testMapRuleWithProxyUser() throws Exception { + // test for proxy user + UserGroupInformation.reset(); + Configuration cfg = new Configuration(); + cfg.set("hadoop.security.authentication", "kerberos"); + cfg.set("juicefs.server-principal", serverPrincipal); + cfg.set("juicefs.mountpoint", "/jfs5"); // to new another jfs + cfg.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + // map tom to client + cfg.set("hadoop.security.auth_to_local", "RULE:[2:$1/$2@$0](tom/.*@EXAMPLE\\.COM)s/.*/client/\nDEFAULT"); + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(tomPrincipal, tomKeytab); + UserGroupInformation foo = UserGroupInformation.createProxyUser("foo", UserGroupInformation.getCurrentUser()); + foo.doAs((PrivilegedExceptionAction) () -> { + FileSystem fs = FileSystem.newInstance(cfg); + Path dir = new Path("/testAuthToLocalWithProxyUser"); + fs.delete(dir); + fs.mkdirs(dir); + FileStatus[] statuses = fs.listStatus(new Path("/")); + for (FileStatus status : statuses) { + System.out.println(status.getPath().toString() + " " + status.getOwner() + " " + status.getGroup()); + } + assertEquals("foo", fs.getFileStatus(dir).getOwner()); + fs.close(); + return null; + }); + + // test for proxy user + UserGroupInformation.reset(); + Configuration cfg2 = new Configuration(); + cfg2.set("hadoop.security.authentication", "kerberos"); + cfg2.set("juicefs.server-principal", serverPrincipal); + cfg2.set("juicefs.mountpoint", "/jfs6"); // to new another jfs + cfg2.addResource(KerberosTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + // map tom to client + cfg2.set("hadoop.security.auth_to_local", "RULE:[2:$1/$2@$0](tom/.*@EXAMPLE\\.COM)s/.*/client/\nDEFAULT"); + UserGroupInformation.setConfiguration(cfg2); + UserGroupInformation.loginUserFromKeytab(tomPrincipal, tomKeytab); + UserGroupInformation bar = UserGroupInformation.createProxyUser("bar", UserGroupInformation.getCurrentUser()); + bar.doAs((PrivilegedExceptionAction) () -> { + try { + FileSystem fs = FileSystem.newInstance(cfg2); + fs.close(); + fail("user client should not proxy bar"); + } catch (Exception ignored) { + } + return null; + }); + } +} diff --git a/sdk/java/src/test/resources/kerberos.cfg b/sdk/java/src/test/resources/kerberos.cfg new file mode 100644 index 000000000000..6f62cc9c3520 --- /dev/null +++ b/sdk/java/src/test/resources/kerberos.cfg @@ -0,0 +1,26 @@ +# kerberos keytab +dev.keytab=BQIAAABTAAIAC0VYQU1QTEUuQ09NAAZzZXJ2ZXIACWxvY2FsaG9zdAAAAAFpCbjNAgASACAxmURMJFxkVLCR5eBKGFz0B3PvnDO2eqv4+3vSG16UnAAAAAIAAABDAAIAC0VYQU1QTEUuQ09NAAZzZXJ2ZXIACWxvY2FsaG9zdAAAAAFpCbjNAgARABB9xnwLbnnGN5jn2TnUpGPZAAAAAgAAAAA= +# delegation token +dev.token.life=604800 +dev.token.renew=86400 + +# superuser and supergroup +dev.superuser=client +dev.supergroup=supergroup + +# Mapping from Kerberos principals to OS user accounts +# https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html#Mapping_from_Kerberos_principals_to_OS_user_accounts +dev.mechanism=hadoop +dev.rule=RULE:[2:$1/$2@$0](root/.*@example.com)s/.*/hdfs/ +dev.rule=RULE:[2:$1/$2@$0](jerry/.*@EXAMPLE\.COM)s/.*/jerry_map/ +dev.rule=RULE:[2:$1/$2@$0](tom/.*@EXAMPLE\.COM)s/.*/client/ +dev.rule=DEFAULT + +# proxy user settings +# https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html#Proxy_user +# users: user1,user2 or * +dev.proxy.client.users=foo +# groups: group1,group2 or * +dev.proxy.client.groups=foogrp +# hosts: host1,host2 or 192.168.1.1,192.168.1.2 or 192.168.1.1/32 or * +dev.proxy.client.hosts=* From 2c639c4abd94aba0ca346607966efaf3d16767ee Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 4 Nov 2025 19:12:09 +0800 Subject: [PATCH 2/6] add license --- .../java/io/juicefs/kerberos/KerberosUtil.java | 15 +++++++++++++++ ...g.apache.hadoop.security.token.TokenIdentifier | 14 ++++++++++++++ .../org.apache.hadoop.security.token.TokenRenewer | 14 ++++++++++++++ .../java/io/juicefs/kerberos/KerberosTest.java | 15 +++++++++++++++ 4 files changed, 58 insertions(+) diff --git a/sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java b/sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java index ba31eb365703..3a0a245ed28c 100644 --- a/sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java +++ b/sdk/java/src/main/java/io/juicefs/kerberos/KerberosUtil.java @@ -1,3 +1,18 @@ +/* + * JuiceFS, Copyright 2025 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.juicefs.kerberos; import org.apache.hadoop.security.UserGroupInformation; diff --git a/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier index 8e941677553b..565386851627 100644 --- a/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier +++ b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier @@ -1 +1,15 @@ +# JuiceFS, Copyright 2025 Juicedata, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + io.juicefs.kerberos.JuiceFSDelegationTokenIdentifier diff --git a/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer index 63b8b46df2ff..d8598e323526 100644 --- a/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer +++ b/sdk/java/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -1 +1,15 @@ +# JuiceFS, Copyright 2025 Juicedata, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + io.juicefs.kerberos.JuiceFSTokenRenewer diff --git a/sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java b/sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java index 4de84cab5954..b5d799c3bbcc 100644 --- a/sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java +++ b/sdk/java/src/test/java/io/juicefs/kerberos/KerberosTest.java @@ -1,3 +1,18 @@ +/* + * JuiceFS, Copyright 2025 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.juicefs.kerberos; import org.apache.hadoop.conf.Configuration; From 07324468bcf345e9a7cec320c845bdd60140e19d Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 4 Nov 2025 19:17:41 +0800 Subject: [PATCH 3/6] fix read krb conf --- cmd/format.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/format.go b/cmd/format.go index cb21b34fc28a..1cdfed8d5f27 100644 --- a/cmd/format.go +++ b/cmd/format.go @@ -398,6 +398,9 @@ func loadEncrypt(keyPath string) string { } func readKerbConf(file string) string { + if file == "" { + return "" + } data, err := os.ReadFile(file) if err != nil { logger.Fatalf("load Kerberos config from %s: %s", file, err) From d18abc7b94bb770a3417c37cda91434999403665 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 4 Nov 2025 19:25:40 +0800 Subject: [PATCH 4/6] fix sdktest --- .github/workflows/sdktest.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/sdktest.yml b/.github/workflows/sdktest.yml index 31bccb135333..1971616ad5fc 100644 --- a/.github/workflows/sdktest.yml +++ b/.github/workflows/sdktest.yml @@ -79,13 +79,16 @@ jobs: - name: Sdk Test run: | + sudo sh sdk/java/kerberos.sh make -C sdk/java/libjfs cd sdk/java - sudo mvn test -B -Dtest=\!io.juicefs.permission.** + sudo mvn test -B -Dtest=\!io.juicefs.permission.**,\!io.juicefs.kerberos.** sudo mvn test -B -Dflink.version=1.17.2 -Dtest=io.juicefs.JuiceFileSystemTest#testFlinkHadoopRecoverableWriter # ranger test sudo JUICEFS_RANGER_TEST=1 mvn test -B -Dtest=io.juicefs.permission.RangerPermissionCheckerTest,\!io.juicefs.permission.RangerPermissionCheckerTest#testRangerCheckerInitFailed sudo mvn test -B -Dtest=io.juicefs.permission.RangerPermissionCheckerTest#testRangerCheckerInitFailed + # kerberos test + sudo mvn test -B -Dtest=io.juicefs.kerberos.KerberosTest sudo mvn package -B -Dmaven.test.skip=true --quiet -Dmaven.javadoc.skip=true expect=$(git rev-parse --short HEAD | cut -b 1-7) From b809248142dfdccafb4c52e6c15f51e69d3b92ac Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Wed, 5 Nov 2025 13:02:01 +0800 Subject: [PATCH 5/6] clean code --- sdk/java/libjfs/kerberos.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/sdk/java/libjfs/kerberos.go b/sdk/java/libjfs/kerberos.go index 53e0c7d2394f..efdac1d54aa1 100644 --- a/sdk/java/libjfs/kerberos.go +++ b/sdk/java/libjfs/kerberos.go @@ -403,17 +403,8 @@ func (vol *volParams) isHostAllowed(realUser, ips, hostname string) bool { } type kerberos struct { - vols map[string]*volParams - mu sync.Mutex - isRenewer bool -} - -func (k *kerberos) getSuperUser(volname string) (string, string) { - vol := k.vols[volname] - if vol != nil { - return vol.superuser, vol.supergroup - } - return "", "" + vols map[string]*volParams + mu sync.Mutex } func (k *kerberos) auth(volname, user, realUser, group, ips, hostname string, reqBytes []byte) syscall.Errno { From 218bc927d55d6eba87f66a094c3c4e07feff0ec6 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Fri, 7 Nov 2025 11:44:55 +0800 Subject: [PATCH 6/6] resolve review --- pkg/meta/sql.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 48d139806996..e14866f96b97 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -5039,22 +5039,13 @@ func (m *dbMeta) doStoreToken(ctx Context, token []byte) (id uint32, st syscall. func (m *dbMeta) doUpdateToken(ctx Context, id uint32, token []byte) syscall.Errno { return errno(m.txn(func(s *xorm.Session) error { - t := &DelegationToken{Id: id} - ok, err := s.Get(t) - if err != nil { - return err - } - if !ok { - return syscall.ENOENT - } - t.Token = token - _, err = s.Cols("token").Update(t, &DelegationToken{Id: id}) + _, err := s.Cols("token").Update(&DelegationToken{Id: id, Token: token}, &DelegationToken{Id: id}) return err })) } func (m *dbMeta) doLoadToken(ctx Context, id uint32) (token []byte, st syscall.Errno) { - err := m.roTxn(ctx, func(s *xorm.Session) error { + err := m.simpleTxn(ctx, func(s *xorm.Session) error { t := &DelegationToken{Id: id} ok, err := s.Get(t) if err != nil {