Skip to content

Commit c5990cc

Browse files
committed
implement lock volume feature
Signed-off-by: Yuji Ito <[email protected]>
1 parent c87e37c commit c5990cc

File tree

6 files changed

+355
-6
lines changed

6 files changed

+355
-6
lines changed

cmd/controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func controllerMain(args []string) error {
162162
// Register custom metrics
163163
finmetrics.Register()
164164

165-
snapRepo := ceph.NewRBDRepository()
165+
rbdRepo := ceph.NewRBDRepository()
166166
maxPartSize, err := resource.ParseQuantity(os.Getenv("MAX_PART_SIZE"))
167167
if err != nil {
168168
return fmt.Errorf("failed to parse MAX_PART_SIZE environment variable: %w", err)
@@ -173,7 +173,8 @@ func controllerMain(args []string) error {
173173
os.Getenv("POD_NAMESPACE"),
174174
os.Getenv("POD_IMAGE"),
175175
&maxPartSize,
176-
snapRepo,
176+
rbdRepo,
177+
rbdRepo,
177178
rawImgExpansionUnitSize,
178179
)
179180
if err = finBackupReconciler.SetupWithManager(mgr); err != nil {

internal/controller/finbackup_controller.go

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ const (
6868
var (
6969
errNonRetryableReconcile = errors.New("non retryable reconciliation error; " +
7070
"reconciliation must not keep going nor be retried")
71+
errVolumeLockedByAnother = errors.New("the volume is locked by another process")
7172
)
7273

7374
// FinBackupReconciler reconciles a FinBackup object
@@ -78,6 +79,7 @@ type FinBackupReconciler struct {
7879
podImage string
7980
maxPartSize *resource.Quantity
8081
snapRepo model.RBDSnapshotRepository
82+
imageLocker model.RBDImageLocker
8183
rawImgExpansionUnitSize uint64
8284
}
8385

@@ -88,6 +90,7 @@ func NewFinBackupReconciler(
8890
podImage string,
8991
maxPartSize *resource.Quantity,
9092
snapRepo model.RBDSnapshotRepository,
93+
imageLocker model.RBDImageLocker,
9194
rawImgExpansionUnitSize uint64,
9295
) *FinBackupReconciler {
9396
return &FinBackupReconciler{
@@ -97,6 +100,7 @@ func NewFinBackupReconciler(
97100
podImage: podImage,
98101
maxPartSize: maxPartSize,
99102
snapRepo: snapRepo,
103+
imageLocker: imageLocker,
100104
rawImgExpansionUnitSize: rawImgExpansionUnitSize,
101105
}
102106
}
@@ -396,8 +400,13 @@ func (r *FinBackupReconciler) createSnapshot(ctx context.Context, backup *finv1.
396400
return ctrl.Result{}, err
397401
}
398402

399-
snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup))
403+
snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup), lockID(backup))
400404
if err != nil {
405+
if errors.Is(err, errVolumeLockedByAnother) {
406+
logger.Info("the volume is locked by another process", "uid", string(backup.GetUID()))
407+
// FIXME: The following "requeue after" is temporary code.
408+
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
409+
}
401410
logger.Error(err, "failed to create or get snapshot")
402411
return ctrl.Result{}, err
403412
}
@@ -571,12 +580,20 @@ func (r *FinBackupReconciler) reconcileDelete(
571580
return ctrl.Result{}, nil
572581
}
573582

574-
func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName string) (*model.RBDSnapshot, error) {
583+
func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName, lockID string) (*model.RBDSnapshot, error) {
575584
snap, err := findSnapshot(r.snapRepo, rbdPool, rbdImage, snapName)
576585
if err != nil {
577586
if !errors.Is(err, model.ErrNotFound) {
578587
return nil, fmt.Errorf("failed to get snapshot: %w", err)
579588
}
589+
590+
lockSuccess, err := r.lockVolume(rbdPool, rbdImage, lockID)
591+
if err != nil {
592+
return nil, fmt.Errorf("failed to lock image: %w", err)
593+
}
594+
if !lockSuccess {
595+
return nil, errVolumeLockedByAnother
596+
}
580597
err = r.snapRepo.CreateSnapshot(rbdPool, rbdImage, snapName)
581598
if err != nil {
582599
return nil, fmt.Errorf("failed to create snapshot: %w", err)
@@ -586,6 +603,10 @@ func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName
586603
return nil, fmt.Errorf("failed to get snapshot after creation: %w", err)
587604
}
588605
}
606+
if err := r.unlockVolume(rbdPool, rbdImage, lockID); err != nil {
607+
return nil, fmt.Errorf("failed to unlock image: %w", err)
608+
}
609+
589610
return snap, nil
590611
}
591612

@@ -613,6 +634,70 @@ func (r *FinBackupReconciler) removeSnapshot(ctx context.Context, backup *finv1.
613634
return nil
614635
}
615636

637+
// lockVolume adds a lock to the specified RBD volume if the lock is not already held.
638+
// It returns true if the lock is held by this caller, false if another lock is held or an error occurs.
639+
func (r *FinBackupReconciler) lockVolume(
640+
poolName, imageName, lockID string,
641+
) (bool, error) {
642+
// Add a lock.
643+
if errAdd := r.imageLocker.LockAdd(poolName, imageName, lockID); errAdd != nil {
644+
locks, errLs := r.imageLocker.LockLs(poolName, imageName)
645+
if errLs != nil {
646+
return false, fmt.Errorf("failed to add a lock and list locks on volume %s/%s: %w", poolName, imageName, errors.Join(errAdd, errLs))
647+
}
648+
649+
switch len(locks) {
650+
case 0:
651+
// It may have been unlocked after the lock failed, but since other causes are also possible, an error is returned.
652+
return false, fmt.Errorf("failed to add a lock to the volume %s/%s: %w", poolName, imageName, errAdd)
653+
654+
case 1:
655+
if locks[0].LockID == lockID {
656+
// Already locked by this FB.
657+
return true, nil
658+
}
659+
// Locked by another process.
660+
return false, nil
661+
662+
default:
663+
// Multiple locks found; unexpected state.
664+
return false, fmt.Errorf("multiple locks found on volume %s/%s after failed lock attempt(%v)", poolName, imageName, locks)
665+
}
666+
}
667+
668+
// Locked
669+
return true, nil
670+
}
671+
672+
// unlockVolume removes the specified lock from the RBD volume if the lock is held.
673+
// No action is taken if the lock is not found.
674+
func (r *FinBackupReconciler) unlockVolume(
675+
poolName, imageName, lockID string,
676+
) error {
677+
// List up locks to check if the lock is held.
678+
locks, err := r.imageLocker.LockLs(poolName, imageName)
679+
if err != nil {
680+
return fmt.Errorf("failed to list locks of the volume %s/%s: %w", poolName, imageName, err)
681+
}
682+
683+
if len(locks) >= 2 {
684+
return fmt.Errorf("multiple locks found on volume %s/%s when unlocking (%v)", poolName, imageName, locks)
685+
}
686+
687+
for _, lock := range locks {
688+
if lock.LockID == lockID {
689+
// Unlock
690+
if err := r.imageLocker.LockRm(poolName, imageName, lock); err != nil {
691+
return fmt.Errorf("failed to remove the lock from the volume %s/%s: %w", poolName, imageName, err)
692+
}
693+
return nil
694+
}
695+
}
696+
697+
// Already unlocked.
698+
return nil
699+
}
700+
616701
func (r *FinBackupReconciler) getRBDPoolAndImageFromPVC(
617702
ctx context.Context,
618703
pvc *corev1.PersistentVolumeClaim,
@@ -733,6 +818,10 @@ func cleanupJobName(backup *finv1.FinBackup) string {
733818
return "fin-cleanup-" + string(backup.GetUID())
734819
}
735820

821+
func lockID(backup *finv1.FinBackup) string {
822+
return string(backup.GetUID())
823+
}
824+
736825
func (r *FinBackupReconciler) createOrUpdateBackupJob(
737826
ctx context.Context, backup *finv1.FinBackup, diffFrom string,
738827
backupTargetPVCUID string, maxPartSize *resource.Quantity,

internal/controller/finbackup_controller_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"slices"
78
"strconv"
@@ -119,6 +120,7 @@ var _ = Describe("FinBackup Controller integration test", Ordered, func() {
119120
podImage: podImage,
120121
maxPartSize: &defaultMaxPartSize,
121122
snapRepo: rbdRepo,
123+
imageLocker: rbdRepo,
122124
rawImgExpansionUnitSize: 100 * 1 << 20,
123125
}
124126
err = reconciler.SetupWithManager(mgr)
@@ -766,6 +768,7 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() {
766768
podImage: podImage,
767769
maxPartSize: &defaultMaxPartSize,
768770
snapRepo: rbdRepo,
771+
imageLocker: rbdRepo,
769772
rawImgExpansionUnitSize: 100 * 1 << 20,
770773
}
771774
})
@@ -1249,6 +1252,158 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() {
12491252
})
12501253
})
12511254

1255+
var _ = Describe("FinBackup Controller Unit Tests", Ordered, func() {
1256+
Context("lockVolume", func() {
1257+
var reconciler *FinBackupReconciler
1258+
var rbdRepo *fake.RBDRepository2
1259+
1260+
lock := func(poolName, imageName, lockID string, rbdErr error) (bool, error) {
1261+
rbdRepo.SetError(rbdErr)
1262+
defer rbdRepo.SetError(nil)
1263+
1264+
locked, err := reconciler.lockVolume(poolName, imageName, lockID)
1265+
if err != nil {
1266+
return locked, err
1267+
}
1268+
// Check if the lock with lockID exists
1269+
locks, err := rbdRepo.LockLs(poolName, imageName)
1270+
if err != nil {
1271+
return locked, err
1272+
}
1273+
lockExists := false
1274+
for _, lock := range locks {
1275+
if lock.LockID == lockID {
1276+
lockExists = true
1277+
break
1278+
}
1279+
}
1280+
Expect(locked).To(Equal(lockExists))
1281+
return locked, nil
1282+
}
1283+
1284+
It("setup", func(ctx SpecContext) {
1285+
volumeInfo := &fake.VolumeInfo{
1286+
Namespace: utils.GetUniqueName("ns-"),
1287+
PVCName: utils.GetUniqueName("pvc-"),
1288+
PVName: utils.GetUniqueName("pv-"),
1289+
PoolName: rbdPoolName,
1290+
ImageName: rbdImageName,
1291+
}
1292+
rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName)
1293+
1294+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
1295+
Expect(err).ToNot(HaveOccurred())
1296+
reconciler = &FinBackupReconciler{
1297+
Client: mgr.GetClient(),
1298+
Scheme: mgr.GetScheme(),
1299+
cephClusterNamespace: namespace,
1300+
podImage: podImage,
1301+
maxPartSize: &defaultMaxPartSize,
1302+
snapRepo: rbdRepo,
1303+
imageLocker: rbdRepo,
1304+
rawImgExpansionUnitSize: 100 * 1 << 20,
1305+
}
1306+
})
1307+
1308+
It("lock a volume successfully", func() {
1309+
locked, err := lock("pool", "image1", "lock1", nil)
1310+
Expect(err).NotTo(HaveOccurred())
1311+
Expect(locked).To(BeTrue())
1312+
})
1313+
It("lock a volume that is already locked by the same lockID", func() {
1314+
locked, err := lock("pool", "image1", "lock1", nil)
1315+
Expect(err).NotTo(HaveOccurred())
1316+
Expect(locked).To(BeTrue())
1317+
})
1318+
It("fail to lock a volume that is already locked by a different lockID", func() {
1319+
locked, err := lock("pool", "image1", "lock2", nil)
1320+
Expect(err).NotTo(HaveOccurred())
1321+
Expect(locked).To(BeFalse())
1322+
})
1323+
It("lock a different volume successfully", func() {
1324+
locked, err := lock("pool", "image2", "lock1", nil)
1325+
Expect(err).NotTo(HaveOccurred())
1326+
Expect(locked).To(BeTrue())
1327+
})
1328+
It("error when rbd lock ls fails", func() {
1329+
locked, err := lock("pool", "image1", "lock1", errors.New("rbd lock ls error"))
1330+
Expect(err).To(HaveOccurred())
1331+
Expect(locked).To(BeFalse())
1332+
})
1333+
})
1334+
1335+
Context("unlockVolume", func() {
1336+
var reconciler *FinBackupReconciler
1337+
var rbdRepo *fake.RBDRepository2
1338+
1339+
It("setup", func(ctx SpecContext) {
1340+
volumeInfo := &fake.VolumeInfo{
1341+
Namespace: utils.GetUniqueName("ns-"),
1342+
PVCName: utils.GetUniqueName("pvc-"),
1343+
PVName: utils.GetUniqueName("pv-"),
1344+
PoolName: rbdPoolName,
1345+
ImageName: rbdImageName,
1346+
}
1347+
rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName)
1348+
1349+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
1350+
Expect(err).ToNot(HaveOccurred())
1351+
reconciler = &FinBackupReconciler{
1352+
Client: mgr.GetClient(),
1353+
Scheme: mgr.GetScheme(),
1354+
cephClusterNamespace: namespace,
1355+
podImage: podImage,
1356+
maxPartSize: &defaultMaxPartSize,
1357+
snapRepo: rbdRepo,
1358+
imageLocker: rbdRepo,
1359+
rawImgExpansionUnitSize: 100 * 1 << 20,
1360+
}
1361+
})
1362+
1363+
It("locks a volume", func(ctx SpecContext) {
1364+
locked, err := reconciler.lockVolume("pool", "image", "lock1")
1365+
Expect(err).NotTo(HaveOccurred())
1366+
Expect(locked).To(BeTrue())
1367+
})
1368+
1369+
DescribeTable("", func(
1370+
ctx SpecContext,
1371+
poolName, imageName, lockID string, rbdErr error,
1372+
expectErr bool,
1373+
) {
1374+
rbdRepo.SetError(rbdErr)
1375+
defer rbdRepo.SetError(nil)
1376+
1377+
err := reconciler.unlockVolume(poolName, imageName, lockID)
1378+
if expectErr {
1379+
Expect(err).To(HaveOccurred())
1380+
} else {
1381+
Expect(err).NotTo(HaveOccurred())
1382+
}
1383+
// Check if the lock is removed
1384+
locks, err := rbdRepo.LockLs(poolName, imageName)
1385+
if expectErr {
1386+
Expect(err).To(HaveOccurred())
1387+
} else {
1388+
Expect(err).NotTo(HaveOccurred())
1389+
for _, l := range locks {
1390+
Expect(l.LockID).NotTo(Equal(lockID))
1391+
}
1392+
}
1393+
},
1394+
Entry("unlock a volume successfully",
1395+
"pool", "image", "lock1", nil,
1396+
false),
1397+
Entry("unlock the same volume again (no-op)",
1398+
"pool", "image", "lock1", nil,
1399+
false),
1400+
Entry("error when rbd unlock fails",
1401+
"pool", "image", "lock1", errors.New("rbd unlock error"),
1402+
true),
1403+
)
1404+
})
1405+
})
1406+
12521407
// CSATEST-1627
12531408
// Description:
12541409
//

0 commit comments

Comments
 (0)