Skip to content

Commit 23e36eb

Browse files
committed
implement lock volume feature
Signed-off-by: Yuji Ito <[email protected]>
1 parent 216056e commit 23e36eb

File tree

6 files changed

+355
-6
lines changed

6 files changed

+355
-6
lines changed

cmd/controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func controllerMain(args []string) error {
162162
// Register custom metrics
163163
finmetrics.Register()
164164

165-
snapRepo := ceph.NewRBDRepository()
165+
rbdRepo := ceph.NewRBDRepository()
166166
maxPartSize, err := resource.ParseQuantity(os.Getenv("MAX_PART_SIZE"))
167167
if err != nil {
168168
return fmt.Errorf("failed to parse MAX_PART_SIZE environment variable: %w", err)
@@ -173,7 +173,8 @@ func controllerMain(args []string) error {
173173
os.Getenv("POD_NAMESPACE"),
174174
os.Getenv("POD_IMAGE"),
175175
&maxPartSize,
176-
snapRepo,
176+
rbdRepo,
177+
rbdRepo,
177178
rawImgExpansionUnitSize,
178179
)
179180
if err = finBackupReconciler.SetupWithManager(mgr); err != nil {

internal/controller/finbackup_controller.go

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ const (
6868
var (
6969
errNonRetryableReconcile = errors.New("non retryable reconciliation error; " +
7070
"reconciliation must not keep going nor be retried")
71+
errVolumeLockedByAnother = errors.New("the volume is locked by another process")
7172
)
7273

7374
// FinBackupReconciler reconciles a FinBackup object
@@ -78,6 +79,7 @@ type FinBackupReconciler struct {
7879
podImage string
7980
maxPartSize *resource.Quantity
8081
snapRepo model.RBDSnapshotRepository
82+
imageLocker model.RBDImageLocker
8183
rawImgExpansionUnitSize uint64
8284
}
8385

@@ -88,6 +90,7 @@ func NewFinBackupReconciler(
8890
podImage string,
8991
maxPartSize *resource.Quantity,
9092
snapRepo model.RBDSnapshotRepository,
93+
imageLocker model.RBDImageLocker,
9194
rawImgExpansionUnitSize uint64,
9295
) *FinBackupReconciler {
9396
return &FinBackupReconciler{
@@ -97,6 +100,7 @@ func NewFinBackupReconciler(
97100
podImage: podImage,
98101
maxPartSize: maxPartSize,
99102
snapRepo: snapRepo,
103+
imageLocker: imageLocker,
100104
rawImgExpansionUnitSize: rawImgExpansionUnitSize,
101105
}
102106
}
@@ -396,8 +400,13 @@ func (r *FinBackupReconciler) createSnapshot(ctx context.Context, backup *finv1.
396400
return ctrl.Result{}, err
397401
}
398402

399-
snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup))
403+
snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup), lockID(backup))
400404
if err != nil {
405+
if errors.Is(err, errVolumeLockedByAnother) {
406+
logger.Info("the volume is locked by another process", "uid", string(backup.GetUID()))
407+
// FIXME: The following "requeue after" is temporary code.
408+
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
409+
}
401410
logger.Error(err, "failed to create or get snapshot")
402411
return ctrl.Result{}, err
403412
}
@@ -572,12 +581,20 @@ func (r *FinBackupReconciler) reconcileDelete(
572581
return ctrl.Result{}, nil
573582
}
574583

575-
func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName string) (*model.RBDSnapshot, error) {
584+
func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName, lockID string) (*model.RBDSnapshot, error) {
576585
snap, err := findSnapshot(r.snapRepo, rbdPool, rbdImage, snapName)
577586
if err != nil {
578587
if !errors.Is(err, model.ErrNotFound) {
579588
return nil, fmt.Errorf("failed to get snapshot: %w", err)
580589
}
590+
591+
lockSuccess, err := r.lockVolume(rbdPool, rbdImage, lockID)
592+
if err != nil {
593+
return nil, fmt.Errorf("failed to lock image: %w", err)
594+
}
595+
if !lockSuccess {
596+
return nil, errVolumeLockedByAnother
597+
}
581598
err = r.snapRepo.CreateSnapshot(rbdPool, rbdImage, snapName)
582599
if err != nil {
583600
return nil, fmt.Errorf("failed to create snapshot: %w", err)
@@ -587,6 +604,10 @@ func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName
587604
return nil, fmt.Errorf("failed to get snapshot after creation: %w", err)
588605
}
589606
}
607+
if err := r.unlockVolume(rbdPool, rbdImage, lockID); err != nil {
608+
return nil, fmt.Errorf("failed to unlock image: %w", err)
609+
}
610+
590611
return snap, nil
591612
}
592613

@@ -614,6 +635,70 @@ func (r *FinBackupReconciler) removeSnapshot(ctx context.Context, backup *finv1.
614635
return nil
615636
}
616637

638+
// lockVolume adds a lock to the specified RBD volume if the lock is not already held.
639+
// It returns true if the lock is held by this caller, false if another lock is held or an error occurs.
640+
func (r *FinBackupReconciler) lockVolume(
641+
poolName, imageName, lockID string,
642+
) (bool, error) {
643+
// Add a lock.
644+
if errAdd := r.imageLocker.LockAdd(poolName, imageName, lockID); errAdd != nil {
645+
locks, errLs := r.imageLocker.LockLs(poolName, imageName)
646+
if errLs != nil {
647+
return false, fmt.Errorf("failed to add a lock and list locks on volume %s/%s: %w", poolName, imageName, errors.Join(errAdd, errLs))
648+
}
649+
650+
switch len(locks) {
651+
case 0:
652+
// It may have been unlocked after the lock failed, but since other causes are also possible, an error is returned.
653+
return false, fmt.Errorf("failed to add a lock to the volume %s/%s: %w", poolName, imageName, errAdd)
654+
655+
case 1:
656+
if locks[0].LockID == lockID {
657+
// Already locked by this FB.
658+
return true, nil
659+
}
660+
// Locked by another process.
661+
return false, nil
662+
663+
default:
664+
// Multiple locks found; unexpected state.
665+
return false, fmt.Errorf("multiple locks found on volume %s/%s after failed lock attempt(%v)", poolName, imageName, locks)
666+
}
667+
}
668+
669+
// Locked
670+
return true, nil
671+
}
672+
673+
// unlockVolume removes the specified lock from the RBD volume if the lock is held.
674+
// No action is taken if the lock is not found.
675+
func (r *FinBackupReconciler) unlockVolume(
676+
poolName, imageName, lockID string,
677+
) error {
678+
// List up locks to check if the lock is held.
679+
locks, err := r.imageLocker.LockLs(poolName, imageName)
680+
if err != nil {
681+
return fmt.Errorf("failed to list locks of the volume %s/%s: %w", poolName, imageName, err)
682+
}
683+
684+
if len(locks) >= 2 {
685+
return fmt.Errorf("multiple locks found on volume %s/%s when unlocking (%v)", poolName, imageName, locks)
686+
}
687+
688+
for _, lock := range locks {
689+
if lock.LockID == lockID {
690+
// Unlock
691+
if err := r.imageLocker.LockRm(poolName, imageName, lock); err != nil {
692+
return fmt.Errorf("failed to remove the lock from the volume %s/%s: %w", poolName, imageName, err)
693+
}
694+
return nil
695+
}
696+
}
697+
698+
// Already unlocked.
699+
return nil
700+
}
701+
617702
func (r *FinBackupReconciler) getRBDPoolAndImageFromPVC(
618703
ctx context.Context,
619704
pvc *corev1.PersistentVolumeClaim,
@@ -734,6 +819,10 @@ func cleanupJobName(backup *finv1.FinBackup) string {
734819
return "fin-cleanup-" + string(backup.GetUID())
735820
}
736821

822+
func lockID(backup *finv1.FinBackup) string {
823+
return string(backup.GetUID())
824+
}
825+
737826
func (r *FinBackupReconciler) createOrUpdateBackupJob(
738827
ctx context.Context, backup *finv1.FinBackup, diffFrom string,
739828
backupTargetPVCUID string, maxPartSize *resource.Quantity,

internal/controller/finbackup_controller_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"slices"
78
"strconv"
@@ -119,6 +120,7 @@ var _ = Describe("FinBackup Controller integration test", Ordered, func() {
119120
podImage: podImage,
120121
maxPartSize: &defaultMaxPartSize,
121122
snapRepo: rbdRepo,
123+
imageLocker: rbdRepo,
122124
rawImgExpansionUnitSize: 100 * 1 << 20,
123125
}
124126
err = reconciler.SetupWithManager(mgr)
@@ -766,6 +768,7 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() {
766768
podImage: podImage,
767769
maxPartSize: &defaultMaxPartSize,
768770
snapRepo: rbdRepo,
771+
imageLocker: rbdRepo,
769772
rawImgExpansionUnitSize: 100 * 1 << 20,
770773
}
771774
})
@@ -1249,6 +1252,158 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() {
12491252
})
12501253
})
12511254

1255+
var _ = Describe("FinBackup Controller Unit Tests", Ordered, func() {
1256+
Context("lockVolume", func() {
1257+
var reconciler *FinBackupReconciler
1258+
var rbdRepo *fake.RBDRepository2
1259+
1260+
lock := func(poolName, imageName, lockID string, rbdErr error) (bool, error) {
1261+
rbdRepo.SetError(rbdErr)
1262+
defer rbdRepo.SetError(nil)
1263+
1264+
locked, err := reconciler.lockVolume(poolName, imageName, lockID)
1265+
if err != nil {
1266+
return locked, err
1267+
}
1268+
// Check if the lock with lockID exists
1269+
locks, err := rbdRepo.LockLs(poolName, imageName)
1270+
if err != nil {
1271+
return locked, err
1272+
}
1273+
lockExists := false
1274+
for _, lock := range locks {
1275+
if lock.LockID == lockID {
1276+
lockExists = true
1277+
break
1278+
}
1279+
}
1280+
Expect(locked).To(Equal(lockExists))
1281+
return locked, nil
1282+
}
1283+
1284+
It("setup", func(ctx SpecContext) {
1285+
volumeInfo := &fake.VolumeInfo{
1286+
Namespace: utils.GetUniqueName("ns-"),
1287+
PVCName: utils.GetUniqueName("pvc-"),
1288+
PVName: utils.GetUniqueName("pv-"),
1289+
PoolName: rbdPoolName,
1290+
ImageName: rbdImageName,
1291+
}
1292+
rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName)
1293+
1294+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
1295+
Expect(err).ToNot(HaveOccurred())
1296+
reconciler = &FinBackupReconciler{
1297+
Client: mgr.GetClient(),
1298+
Scheme: mgr.GetScheme(),
1299+
cephClusterNamespace: namespace,
1300+
podImage: podImage,
1301+
maxPartSize: &defaultMaxPartSize,
1302+
snapRepo: rbdRepo,
1303+
imageLocker: rbdRepo,
1304+
rawImgExpansionUnitSize: 100 * 1 << 20,
1305+
}
1306+
})
1307+
1308+
It("lock a volume successfully", func() {
1309+
locked, err := lock("pool", "image1", "lock1", nil)
1310+
Expect(err).NotTo(HaveOccurred())
1311+
Expect(locked).To(BeTrue())
1312+
})
1313+
It("lock a volume that is already locked by the same lockID", func() {
1314+
locked, err := lock("pool", "image1", "lock1", nil)
1315+
Expect(err).NotTo(HaveOccurred())
1316+
Expect(locked).To(BeTrue())
1317+
})
1318+
It("fail to lock a volume that is already locked by a different lockID", func() {
1319+
locked, err := lock("pool", "image1", "lock2", nil)
1320+
Expect(err).NotTo(HaveOccurred())
1321+
Expect(locked).To(BeFalse())
1322+
})
1323+
It("lock a different volume successfully", func() {
1324+
locked, err := lock("pool", "image2", "lock1", nil)
1325+
Expect(err).NotTo(HaveOccurred())
1326+
Expect(locked).To(BeTrue())
1327+
})
1328+
It("error when rbd lock ls fails", func() {
1329+
locked, err := lock("pool", "image1", "lock1", errors.New("rbd lock ls error"))
1330+
Expect(err).To(HaveOccurred())
1331+
Expect(locked).To(BeFalse())
1332+
})
1333+
})
1334+
1335+
Context("unlockVolume", func() {
1336+
var reconciler *FinBackupReconciler
1337+
var rbdRepo *fake.RBDRepository2
1338+
1339+
It("setup", func(ctx SpecContext) {
1340+
volumeInfo := &fake.VolumeInfo{
1341+
Namespace: utils.GetUniqueName("ns-"),
1342+
PVCName: utils.GetUniqueName("pvc-"),
1343+
PVName: utils.GetUniqueName("pv-"),
1344+
PoolName: rbdPoolName,
1345+
ImageName: rbdImageName,
1346+
}
1347+
rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName)
1348+
1349+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
1350+
Expect(err).ToNot(HaveOccurred())
1351+
reconciler = &FinBackupReconciler{
1352+
Client: mgr.GetClient(),
1353+
Scheme: mgr.GetScheme(),
1354+
cephClusterNamespace: namespace,
1355+
podImage: podImage,
1356+
maxPartSize: &defaultMaxPartSize,
1357+
snapRepo: rbdRepo,
1358+
imageLocker: rbdRepo,
1359+
rawImgExpansionUnitSize: 100 * 1 << 20,
1360+
}
1361+
})
1362+
1363+
It("locks a volume", func(ctx SpecContext) {
1364+
locked, err := reconciler.lockVolume("pool", "image", "lock1")
1365+
Expect(err).NotTo(HaveOccurred())
1366+
Expect(locked).To(BeTrue())
1367+
})
1368+
1369+
DescribeTable("", func(
1370+
ctx SpecContext,
1371+
poolName, imageName, lockID string, rbdErr error,
1372+
expectErr bool,
1373+
) {
1374+
rbdRepo.SetError(rbdErr)
1375+
defer rbdRepo.SetError(nil)
1376+
1377+
err := reconciler.unlockVolume(poolName, imageName, lockID)
1378+
if expectErr {
1379+
Expect(err).To(HaveOccurred())
1380+
} else {
1381+
Expect(err).NotTo(HaveOccurred())
1382+
}
1383+
// Check if the lock is removed
1384+
locks, err := rbdRepo.LockLs(poolName, imageName)
1385+
if expectErr {
1386+
Expect(err).To(HaveOccurred())
1387+
} else {
1388+
Expect(err).NotTo(HaveOccurred())
1389+
for _, l := range locks {
1390+
Expect(l.LockID).NotTo(Equal(lockID))
1391+
}
1392+
}
1393+
},
1394+
Entry("unlock a volume successfully",
1395+
"pool", "image", "lock1", nil,
1396+
false),
1397+
Entry("unlock the same volume again (no-op)",
1398+
"pool", "image", "lock1", nil,
1399+
false),
1400+
Entry("error when rbd unlock fails",
1401+
"pool", "image", "lock1", errors.New("rbd unlock error"),
1402+
true),
1403+
)
1404+
})
1405+
})
1406+
12521407
// CSATEST-1627
12531408
// Description:
12541409
//

0 commit comments

Comments
 (0)