Skip to content

Commit 4544b2a

Browse files
committed
implement lock volume feature
Signed-off-by: Yuji Ito <[email protected]>
1 parent 443b696 commit 4544b2a

File tree

5 files changed

+349
-4
lines changed

5 files changed

+349
-4
lines changed

cmd/controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func controllerMain(args []string) error {
162162
// Register custom metrics
163163
finmetrics.Register()
164164

165-
snapRepo := ceph.NewRBDRepository()
165+
rbdRepo := ceph.NewRBDRepository()
166166
maxPartSize, err := resource.ParseQuantity(os.Getenv("MAX_PART_SIZE"))
167167
if err != nil {
168168
return fmt.Errorf("failed to parse MAX_PART_SIZE environment variable: %w", err)
@@ -173,7 +173,8 @@ func controllerMain(args []string) error {
173173
os.Getenv("POD_NAMESPACE"),
174174
os.Getenv("POD_IMAGE"),
175175
&maxPartSize,
176-
snapRepo,
176+
rbdRepo,
177+
rbdRepo,
177178
rawImgExpansionUnitSize,
178179
)
179180
if err = finBackupReconciler.SetupWithManager(mgr); err != nil {

internal/controller/finbackup_controller.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ const (
6666
var (
6767
errNonRetryableReconcile = errors.New("non retryable reconciliation error; " +
6868
"reconciliation must not keep going nor be retried")
69+
errVolumeLockedByAnother = errors.New("the volume is locked by another process")
6970
)
7071

7172
// FinBackupReconciler reconciles a FinBackup object
@@ -76,6 +77,7 @@ type FinBackupReconciler struct {
7677
podImage string
7778
maxPartSize *resource.Quantity
7879
snapRepo model.RBDSnapshotRepository
80+
imageLocker model.RBDImageLocker
7981
rawImgExpansionUnitSize uint64
8082
}
8183

@@ -86,6 +88,7 @@ func NewFinBackupReconciler(
8688
podImage string,
8789
maxPartSize *resource.Quantity,
8890
snapRepo model.RBDSnapshotRepository,
91+
imageLocker model.RBDImageLocker,
8992
rawImgExpansionUnitSize uint64,
9093
) *FinBackupReconciler {
9194
return &FinBackupReconciler{
@@ -95,6 +98,7 @@ func NewFinBackupReconciler(
9598
podImage: podImage,
9699
maxPartSize: maxPartSize,
97100
snapRepo: snapRepo,
101+
imageLocker: imageLocker,
98102
rawImgExpansionUnitSize: rawImgExpansionUnitSize,
99103
}
100104
}
@@ -401,8 +405,12 @@ func (r *FinBackupReconciler) createSnapshot(ctx context.Context, backup *finv1.
401405
return ctrl.Result{}, err
402406
}
403407

404-
snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup))
408+
snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup), lockID(backup))
405409
if err != nil {
410+
if errors.Is(err, errVolumeLockedByAnother) {
411+
// FIXME: The following "requeue after" is temporary code.
412+
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
413+
}
406414
logger.Error(err, "failed to create or get snapshot")
407415
return ctrl.Result{}, err
408416
}
@@ -577,12 +585,20 @@ func (r *FinBackupReconciler) reconcileDelete(
577585
return ctrl.Result{}, nil
578586
}
579587

580-
func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName string) (*model.RBDSnapshot, error) {
588+
func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName, lockID string) (*model.RBDSnapshot, error) {
581589
snap, err := findSnapshot(r.snapRepo, rbdPool, rbdImage, snapName)
582590
if err != nil {
583591
if !errors.Is(err, model.ErrNotFound) {
584592
return nil, fmt.Errorf("failed to get snapshot: %w", err)
585593
}
594+
595+
lockSuccess, err := r.lockVolume(rbdPool, rbdImage, lockID)
596+
if err != nil {
597+
return nil, fmt.Errorf("failed to lock image: %w", err)
598+
}
599+
if !lockSuccess {
600+
return nil, errVolumeLockedByAnother
601+
}
586602
err = r.snapRepo.CreateSnapshot(rbdPool, rbdImage, snapName)
587603
if err != nil {
588604
return nil, fmt.Errorf("failed to create snapshot: %w", err)
@@ -592,6 +608,10 @@ func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName
592608
return nil, fmt.Errorf("failed to get snapshot after creation: %w", err)
593609
}
594610
}
611+
if err := r.unlockVolume(rbdPool, rbdImage, lockID); err != nil {
612+
return nil, fmt.Errorf("failed to unlock image: %w", err)
613+
}
614+
595615
return snap, nil
596616
}
597617

@@ -619,6 +639,70 @@ func (r *FinBackupReconciler) removeSnapshot(ctx context.Context, backup *finv1.
619639
return nil
620640
}
621641

642+
// lockVolume adds a lock to the specified RBD volume if the lock is not already held.
643+
// It returns true if the lock is held by this caller, false if another lock is held or an error occurs.
644+
func (r *FinBackupReconciler) lockVolume(
645+
poolName, imageName, lockID string,
646+
) (bool, error) {
647+
// Add a lock.
648+
if errAdd := r.imageLocker.LockAdd(poolName, imageName, lockID); errAdd != nil {
649+
locks, errLs := r.imageLocker.LockLs(poolName, imageName)
650+
if errLs != nil {
651+
return false, fmt.Errorf("failed to add a lock and list locks on volume %s/%s: %w", poolName, imageName, errors.Join(errAdd, errLs))
652+
}
653+
654+
switch len(locks) {
655+
case 0:
656+
// It may have been unlocked after the lock failed, but since other causes are also possible, an error is returned.
657+
return false, fmt.Errorf("failed to add a lock to the volume %s/%s: %w", poolName, imageName, errAdd)
658+
659+
case 1:
660+
if locks[0].LockID == lockID {
661+
// Already locked by this FB.
662+
return true, nil
663+
}
664+
// Locked by another process.
665+
return false, nil
666+
667+
default:
668+
// Multiple locks found; unexpected state.
669+
return false, fmt.Errorf("multiple locks found on volume %s/%s after failed lock attempt(%v)", poolName, imageName, locks)
670+
}
671+
}
672+
673+
// Locked
674+
return true, nil
675+
}
676+
677+
// unlockVolume removes the specified lock from the RBD volume if the lock is held.
678+
// No action is taken if the lock is not found.
679+
func (r *FinBackupReconciler) unlockVolume(
680+
poolName, imageName, lockID string,
681+
) error {
682+
// List up locks to check if the lock is held.
683+
locks, err := r.imageLocker.LockLs(poolName, imageName)
684+
if err != nil {
685+
return fmt.Errorf("failed to list locks of the volume %s/%s: %w", poolName, imageName, err)
686+
}
687+
688+
if len(locks) >= 2 {
689+
return fmt.Errorf("multiple locks found on volume %s/%s when unlocking (%v)", poolName, imageName, locks)
690+
}
691+
692+
for _, lock := range locks {
693+
if lock.LockID == lockID {
694+
// Unlock
695+
if err := r.imageLocker.LockRm(poolName, imageName, lock); err != nil {
696+
return fmt.Errorf("failed to remove the lock from the volume %s/%s: %w", poolName, imageName, err)
697+
}
698+
return nil
699+
}
700+
}
701+
702+
// Already unlocked.
703+
return nil
704+
}
705+
622706
func (r *FinBackupReconciler) getRBDPoolAndImageFromPVC(
623707
ctx context.Context,
624708
pvc *corev1.PersistentVolumeClaim,
@@ -742,6 +826,10 @@ func cleanupJobName(backup *finv1.FinBackup) string {
742826
return "fin-cleanup-" + string(backup.GetUID())
743827
}
744828

829+
func lockID(backup *finv1.FinBackup) string {
830+
return string(backup.GetUID())
831+
}
832+
745833
func (r *FinBackupReconciler) createOrUpdateBackupJob(
746834
ctx context.Context, backup *finv1.FinBackup, diffFrom string,
747835
backupTargetPVCUID string, maxPartSize *resource.Quantity,

internal/controller/finbackup_controller_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"slices"
78
"strconv"
@@ -119,6 +120,7 @@ var _ = Describe("FinBackup Controller integration test", Ordered, func() {
119120
podImage: podImage,
120121
maxPartSize: &defaultMaxPartSize,
121122
snapRepo: rbdRepo,
123+
imageLocker: rbdRepo,
122124
rawImgExpansionUnitSize: 100 * 1 << 20,
123125
}
124126
err = reconciler.SetupWithManager(mgr)
@@ -725,6 +727,7 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() {
725727
podImage: podImage,
726728
maxPartSize: &defaultMaxPartSize,
727729
snapRepo: rbdRepo,
730+
imageLocker: rbdRepo,
728731
rawImgExpansionUnitSize: 100 * 1 << 20,
729732
}
730733
})
@@ -1208,6 +1211,158 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() {
12081211
})
12091212
})
12101213

1214+
var _ = Describe("FinBackup Controller Unit Tests", Ordered, func() {
1215+
Context("lockVolume", func() {
1216+
var reconciler *FinBackupReconciler
1217+
var rbdRepo *fake.RBDRepository2
1218+
1219+
lock := func(poolName, imageName, lockID string, rbdErr error) (bool, error) {
1220+
rbdRepo.SetError(rbdErr)
1221+
defer rbdRepo.SetError(nil)
1222+
1223+
locked, err := reconciler.lockVolume(poolName, imageName, lockID)
1224+
if err != nil {
1225+
return locked, err
1226+
}
1227+
// Check if the lock with lockID exists
1228+
locks, err := rbdRepo.LockLs(poolName, imageName)
1229+
if err != nil {
1230+
return locked, err
1231+
}
1232+
lockExists := false
1233+
for _, lock := range locks {
1234+
if lock.LockID == lockID {
1235+
lockExists = true
1236+
break
1237+
}
1238+
}
1239+
Expect(locked).To(Equal(lockExists))
1240+
return locked, nil
1241+
}
1242+
1243+
It("setup", func(ctx SpecContext) {
1244+
volumeInfo := &fake.VolumeInfo{
1245+
Namespace: utils.GetUniqueName("ns-"),
1246+
PVCName: utils.GetUniqueName("pvc-"),
1247+
PVName: utils.GetUniqueName("pv-"),
1248+
PoolName: rbdPoolName,
1249+
ImageName: rbdImageName,
1250+
}
1251+
rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName)
1252+
1253+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
1254+
Expect(err).ToNot(HaveOccurred())
1255+
reconciler = &FinBackupReconciler{
1256+
Client: mgr.GetClient(),
1257+
Scheme: mgr.GetScheme(),
1258+
cephClusterNamespace: namespace,
1259+
podImage: podImage,
1260+
maxPartSize: &defaultMaxPartSize,
1261+
snapRepo: rbdRepo,
1262+
imageLocker: rbdRepo,
1263+
rawImgExpansionUnitSize: 100 * 1 << 20,
1264+
}
1265+
})
1266+
1267+
It("lock a volume successfully", func() {
1268+
locked, err := lock("pool", "image1", "lock1", nil)
1269+
Expect(err).NotTo(HaveOccurred())
1270+
Expect(locked).To(BeTrue())
1271+
})
1272+
It("lock a volume that is already locked by the same lockID", func() {
1273+
locked, err := lock("pool", "image1", "lock1", nil)
1274+
Expect(err).NotTo(HaveOccurred())
1275+
Expect(locked).To(BeTrue())
1276+
})
1277+
It("fail to lock a volume that is already locked by a different lockID", func() {
1278+
locked, err := lock("pool", "image1", "lock2", nil)
1279+
Expect(err).NotTo(HaveOccurred())
1280+
Expect(locked).To(BeFalse())
1281+
})
1282+
It("lock a different volume successfully", func() {
1283+
locked, err := lock("pool", "image2", "lock1", nil)
1284+
Expect(err).NotTo(HaveOccurred())
1285+
Expect(locked).To(BeTrue())
1286+
})
1287+
It("error when rbd lock ls fails", func() {
1288+
locked, err := lock("pool", "image1", "lock1", errors.New("rbd lock ls error"))
1289+
Expect(err).To(HaveOccurred())
1290+
Expect(locked).To(BeFalse())
1291+
})
1292+
})
1293+
1294+
Context("unlockVolume", func() {
1295+
var reconciler *FinBackupReconciler
1296+
var rbdRepo *fake.RBDRepository2
1297+
1298+
It("setup", func(ctx SpecContext) {
1299+
volumeInfo := &fake.VolumeInfo{
1300+
Namespace: utils.GetUniqueName("ns-"),
1301+
PVCName: utils.GetUniqueName("pvc-"),
1302+
PVName: utils.GetUniqueName("pv-"),
1303+
PoolName: rbdPoolName,
1304+
ImageName: rbdImageName,
1305+
}
1306+
rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName)
1307+
1308+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
1309+
Expect(err).ToNot(HaveOccurred())
1310+
reconciler = &FinBackupReconciler{
1311+
Client: mgr.GetClient(),
1312+
Scheme: mgr.GetScheme(),
1313+
cephClusterNamespace: namespace,
1314+
podImage: podImage,
1315+
maxPartSize: &defaultMaxPartSize,
1316+
snapRepo: rbdRepo,
1317+
imageLocker: rbdRepo,
1318+
rawImgExpansionUnitSize: 100 * 1 << 20,
1319+
}
1320+
})
1321+
1322+
It("locks a volume", func(ctx SpecContext) {
1323+
locked, err := reconciler.lockVolume("pool", "image", "lock1")
1324+
Expect(err).NotTo(HaveOccurred())
1325+
Expect(locked).To(BeTrue())
1326+
})
1327+
1328+
DescribeTable("", func(
1329+
ctx SpecContext,
1330+
poolName, imageName, lockID string, rbdErr error,
1331+
expectErr bool,
1332+
) {
1333+
rbdRepo.SetError(rbdErr)
1334+
defer rbdRepo.SetError(nil)
1335+
1336+
err := reconciler.unlockVolume(poolName, imageName, lockID)
1337+
if expectErr {
1338+
Expect(err).To(HaveOccurred())
1339+
} else {
1340+
Expect(err).NotTo(HaveOccurred())
1341+
}
1342+
// Check if the lock is removed
1343+
locks, err := rbdRepo.LockLs(poolName, imageName)
1344+
if expectErr {
1345+
Expect(err).To(HaveOccurred())
1346+
} else {
1347+
Expect(err).NotTo(HaveOccurred())
1348+
for _, l := range locks {
1349+
Expect(l.LockID).NotTo(Equal(lockID))
1350+
}
1351+
}
1352+
},
1353+
Entry("unlock a volume successfully",
1354+
"pool", "image", "lock1", nil,
1355+
false),
1356+
Entry("unlock the same volume again (no-op)",
1357+
"pool", "image", "lock1", nil,
1358+
false),
1359+
Entry("error when rbd unlock fails",
1360+
"pool", "image", "lock1", errors.New("rbd unlock error"),
1361+
true),
1362+
)
1363+
})
1364+
})
1365+
12111366
// CSATEST-1627
12121367
// Description:
12131368
//

0 commit comments

Comments
 (0)