diff --git a/Makefile b/Makefile index 84ca860d..799a486a 100644 --- a/Makefile +++ b/Makefile @@ -92,8 +92,12 @@ clean-test: rm -f $(TEST_XFS_IMG); \ fi +.PHONY: mock +mock: mockgen + $(MOCKGEN) -source=internal/infrastructure/ceph/command.go -destination=internal/infrastructure/ceph/command_mock.go -package=ceph + .PHONY: test -test: manifests generate fmt vet envtest ## Run tests. +test: manifests generate fmt vet envtest mock ## Run tests. $(MAKE) prepare-test KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" \ TEST_BLOCK_DEV=$(TEST_BLOCK_DEV) \ @@ -197,7 +201,8 @@ KUBECTL ?= kubectl KUSTOMIZE ?= $(LOCALBIN)/kustomize-$(KUSTOMIZE_VERSION) CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION) ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION) -GOLANGCI_LINT = $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION) +GOLANGCI_LINT ?= $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION) +MOCKGEN ?= $(LOCALBIN)/mockgen-$(MOCKGEN_VERSION) .PHONY: kustomize kustomize: $(KUSTOMIZE) ## Download kustomize locally if necessary. @@ -217,7 +222,12 @@ $(ENVTEST): $(LOCALBIN) .PHONY: golangci-lint golangci-lint: $(GOLANGCI_LINT) ## Download golangci-lint locally if necessary. $(GOLANGCI_LINT): $(LOCALBIN) - $(call go-install-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/v2/cmd/golangci-lint,${GOLANGCI_LINT_VERSION}) + $(call go-install-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/v2/cmd/golangci-lint,$(GOLANGCI_LINT_VERSION)) + +.PHONY: mockgen +mockgen: $(MOCKGEN) ## Download mockgen locally if necessary. +$(MOCKGEN): $(LOCALBIN) + $(call go-install-tool,$(MOCKGEN),go.uber.org/mock/mockgen,$(MOCKGEN_VERSION)) # go-install-tool will 'go install' any package with custom target and name of binary, if it doesn't exist # $1 - target path with name of binary (ideally with version) diff --git a/cmd/controller.go b/cmd/controller.go index 54e8e782..6fb769ff 100644 --- a/cmd/controller.go +++ b/cmd/controller.go @@ -162,7 +162,7 @@ func controllerMain(args []string) error { // Register custom metrics finmetrics.Register() - snapRepo := ceph.NewRBDRepository() + rbdRepo := ceph.NewRBDRepository() maxPartSize, err := resource.ParseQuantity(os.Getenv("MAX_PART_SIZE")) if err != nil { return fmt.Errorf("failed to parse MAX_PART_SIZE environment variable: %w", err) @@ -173,7 +173,8 @@ func controllerMain(args []string) error { os.Getenv("POD_NAMESPACE"), os.Getenv("POD_IMAGE"), &maxPartSize, - snapRepo, + rbdRepo, + rbdRepo, rawImgExpansionUnitSize, ) if err = finBackupReconciler.SetupWithManager(mgr); err != nil { diff --git a/go.mod b/go.mod index 26a47d4f..c5f41dab 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cybozu-go/fin go 1.24 require ( + github.com/cespare/xxhash/v2 v2.3.0 github.com/google/uuid v1.6.0 github.com/mattn/go-sqlite3 v1.14.28 github.com/onsi/ginkgo/v2 v2.23.4 @@ -10,7 +11,8 @@ require ( github.com/prometheus/client_golang v1.22.0 github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 - golang.org/x/sys v0.32.0 + go.uber.org/mock v0.6.0 + golang.org/x/sys v0.35.0 k8s.io/api v0.32.7 k8s.io/apimachinery v0.32.7 k8s.io/client-go v0.32.7 @@ -22,7 +24,6 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect @@ -59,13 +60,13 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect - golang.org/x/net v0.38.0 // indirect + golang.org/x/net v0.43.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect - golang.org/x/sync v0.12.0 // indirect - golang.org/x/term v0.30.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/term v0.34.0 // indirect + golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.9.0 // indirect - golang.org/x/tools v0.31.0 // indirect + golang.org/x/tools v0.36.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect diff --git a/go.sum b/go.sum index e7aa6a2c..f26c9999 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= +go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= @@ -140,34 +142,34 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= -golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= +golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU= -golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ= +golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= +golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/controller/finbackup_controller.go b/internal/controller/finbackup_controller.go index cab41dba..c0914dff 100644 --- a/internal/controller/finbackup_controller.go +++ b/internal/controller/finbackup_controller.go @@ -45,7 +45,7 @@ const ( labelComponentDeletionJob = "deletion-job" // Annotations - annotationBackupTargetRBDImage = "fin.cybozu.io/backup-target-rbd-image" + AnnotationBackupTargetRBDImage = "fin.cybozu.io/backup-target-rbd-image" annotationDiffFrom = "fin.cybozu.io/diff-from" annotationFinBackupName = "fin.cybozu.io/finbackup-name" annotationFinBackupNamespace = "fin.cybozu.io/finbackup-namespace" @@ -68,6 +68,7 @@ const ( var ( errNonRetryableReconcile = errors.New("non retryable reconciliation error; " + "reconciliation must not keep going nor be retried") + errVolumeLockedByAnother = errors.New("the volume is locked by another process") ) // FinBackupReconciler reconciles a FinBackup object @@ -78,6 +79,7 @@ type FinBackupReconciler struct { podImage string maxPartSize *resource.Quantity snapRepo model.RBDSnapshotRepository + imageLocker model.RBDImageLocker rawImgExpansionUnitSize uint64 } @@ -88,6 +90,7 @@ func NewFinBackupReconciler( podImage string, maxPartSize *resource.Quantity, snapRepo model.RBDSnapshotRepository, + imageLocker model.RBDImageLocker, rawImgExpansionUnitSize uint64, ) *FinBackupReconciler { return &FinBackupReconciler{ @@ -97,6 +100,7 @@ func NewFinBackupReconciler( podImage: podImage, maxPartSize: maxPartSize, snapRepo: snapRepo, + imageLocker: imageLocker, rawImgExpansionUnitSize: rawImgExpansionUnitSize, } } @@ -386,7 +390,7 @@ func (r *FinBackupReconciler) createSnapshot(ctx context.Context, backup *finv1. if annotations == nil { annotations = map[string]string{} } - annotations[annotationBackupTargetRBDImage] = rbdImage + annotations[AnnotationBackupTargetRBDImage] = rbdImage annotations[annotationRBDPool] = rbdPool backup.SetAnnotations(annotations) @@ -396,8 +400,13 @@ func (r *FinBackupReconciler) createSnapshot(ctx context.Context, backup *finv1. return ctrl.Result{}, err } - snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup)) + snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup), lockID(backup)) if err != nil { + if errors.Is(err, errVolumeLockedByAnother) { + logger.Info("the volume is locked by another process", "uid", string(backup.GetUID())) + // FIXME: The following "requeue after" is temporary code. + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } logger.Error(err, "failed to create or get snapshot") return ctrl.Result{}, err } @@ -571,12 +580,20 @@ func (r *FinBackupReconciler) reconcileDelete( return ctrl.Result{}, nil } -func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName string) (*model.RBDSnapshot, error) { +func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName, lockID string) (*model.RBDSnapshot, error) { snap, err := findSnapshot(r.snapRepo, rbdPool, rbdImage, snapName) if err != nil { if !errors.Is(err, model.ErrNotFound) { return nil, fmt.Errorf("failed to get snapshot: %w", err) } + + lockSuccess, err := r.lockVolume(rbdPool, rbdImage, lockID) + if err != nil { + return nil, fmt.Errorf("failed to lock image: %w", err) + } + if !lockSuccess { + return nil, errVolumeLockedByAnother + } err = r.snapRepo.CreateSnapshot(rbdPool, rbdImage, snapName) if err != nil { return nil, fmt.Errorf("failed to create snapshot: %w", err) @@ -586,6 +603,10 @@ func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName return nil, fmt.Errorf("failed to get snapshot after creation: %w", err) } } + if err := r.unlockVolume(rbdPool, rbdImage, lockID); err != nil { + return nil, fmt.Errorf("failed to unlock image: %w", err) + } + return snap, nil } @@ -613,6 +634,70 @@ func (r *FinBackupReconciler) removeSnapshot(ctx context.Context, backup *finv1. return nil } +// lockVolume adds a lock to the specified RBD volume if the lock is not already held. +// It returns true if the lock is held by this caller, false if another lock is held or an error occurs. +func (r *FinBackupReconciler) lockVolume( + poolName, imageName, lockID string, +) (bool, error) { + // Add a lock. + if errAdd := r.imageLocker.LockAdd(poolName, imageName, lockID); errAdd != nil { + locks, errLs := r.imageLocker.LockLs(poolName, imageName) + if errLs != nil { + return false, fmt.Errorf("failed to add a lock and list locks on volume %s/%s: %w", poolName, imageName, errors.Join(errAdd, errLs)) + } + + switch len(locks) { + case 0: + // It may have been unlocked after the lock failed, but since other causes are also possible, an error is returned. + return false, fmt.Errorf("failed to add a lock to the volume %s/%s: %w", poolName, imageName, errAdd) + + case 1: + if locks[0].LockID == lockID { + // Already locked by this FB. + return true, nil + } + // Locked by another process. + return false, nil + + default: + // Multiple locks found; unexpected state. + return false, fmt.Errorf("multiple locks found on volume %s/%s after failed lock attempt(%v)", poolName, imageName, locks) + } + } + + // Locked + return true, nil +} + +// unlockVolume removes the specified lock from the RBD volume if the lock is held. +// No action is taken if the lock is not found. +func (r *FinBackupReconciler) unlockVolume( + poolName, imageName, lockID string, +) error { + // List up locks to check if the lock is held. + locks, err := r.imageLocker.LockLs(poolName, imageName) + if err != nil { + return fmt.Errorf("failed to list locks of the volume %s/%s: %w", poolName, imageName, err) + } + + if len(locks) >= 2 { + return fmt.Errorf("multiple locks found on volume %s/%s when unlocking (%v)", poolName, imageName, locks) + } + + for _, lock := range locks { + if lock.LockID == lockID { + // Unlock + if err := r.imageLocker.LockRm(poolName, imageName, lock); err != nil { + return fmt.Errorf("failed to remove the lock from the volume %s/%s: %w", poolName, imageName, err) + } + return nil + } + } + + // Already unlocked. + return nil +} + func (r *FinBackupReconciler) getRBDPoolAndImageFromPVC( ctx context.Context, pvc *corev1.PersistentVolumeClaim, @@ -637,7 +722,7 @@ func (r *FinBackupReconciler) getRBDPoolAndImageFromPVC( func (r *FinBackupReconciler) getRBDPoolAndImage(ctx context.Context, backup *finv1.FinBackup) (string, string, error) { rbdPool := backup.GetAnnotations()[annotationRBDPool] - rbdImage := backup.GetAnnotations()[annotationBackupTargetRBDImage] + rbdImage := backup.GetAnnotations()[AnnotationBackupTargetRBDImage] if rbdPool != "" && rbdImage != "" { return rbdPool, rbdImage, nil } @@ -733,6 +818,10 @@ func cleanupJobName(backup *finv1.FinBackup) string { return "fin-cleanup-" + string(backup.GetUID()) } +func lockID(backup *finv1.FinBackup) string { + return string(backup.GetUID()) +} + func (r *FinBackupReconciler) createOrUpdateBackupJob( ctx context.Context, backup *finv1.FinBackup, diffFrom string, backupTargetPVCUID string, maxPartSize *resource.Quantity, @@ -794,7 +883,7 @@ func (r *FinBackupReconciler) createOrUpdateBackupJob( }, { Name: "RBD_IMAGE_NAME", - Value: backup.GetAnnotations()[annotationBackupTargetRBDImage], + Value: backup.GetAnnotations()[AnnotationBackupTargetRBDImage], }, { Name: "BACKUP_SNAPSHOT_ID", diff --git a/internal/controller/finbackup_controller_test.go b/internal/controller/finbackup_controller_test.go index 26434f02..0587b016 100644 --- a/internal/controller/finbackup_controller_test.go +++ b/internal/controller/finbackup_controller_test.go @@ -2,6 +2,7 @@ package controller import ( "context" + "errors" "fmt" "slices" "strconv" @@ -119,6 +120,7 @@ var _ = Describe("FinBackup Controller integration test", Ordered, func() { podImage: podImage, maxPartSize: &defaultMaxPartSize, snapRepo: rbdRepo, + imageLocker: rbdRepo, rawImgExpansionUnitSize: 100 * 1 << 20, } err = reconciler.SetupWithManager(mgr) @@ -324,7 +326,7 @@ var _ = Describe("FinBackup Controller integration test", Ordered, func() { Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(finbackup2), finbackup2)).Should(Succeed()) Expect(finbackup2.GetLabels()).To(HaveKeyWithValue(labelBackupTargetPVCUID, string(pvc1.GetUID()))) annotations := finbackup2.GetAnnotations() - Expect(annotations).To(HaveKeyWithValue(annotationBackupTargetRBDImage, rbdImageName)) + Expect(annotations).To(HaveKeyWithValue(AnnotationBackupTargetRBDImage, rbdImageName)) Expect(annotations).To(HaveKeyWithValue(annotationRBDPool, rbdPoolName)) // Incremental backup specific: the diff-from annotation should exist and point to the SnapID of the full backup. @@ -766,6 +768,7 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() { podImage: podImage, maxPartSize: &defaultMaxPartSize, snapRepo: rbdRepo, + imageLocker: rbdRepo, rawImgExpansionUnitSize: 100 * 1 << 20, } }) @@ -810,7 +813,7 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() { var updated finv1.FinBackup Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(finbackup), &updated)).Should(Succeed()) Expect(updated.GetLabels()).To(HaveKeyWithValue(labelBackupTargetPVCUID, string(pvc.GetUID()))) - Expect(updated.GetAnnotations()).To(HaveKeyWithValue(annotationBackupTargetRBDImage, rbdImageName)) + Expect(updated.GetAnnotations()).To(HaveKeyWithValue(AnnotationBackupTargetRBDImage, rbdImageName)) Expect(updated.GetAnnotations()).To(HaveKeyWithValue(annotationRBDPool, rbdPoolName)) }) }) @@ -1249,6 +1252,158 @@ var _ = Describe("FinBackup Controller Reconcile Test", Ordered, func() { }) }) +var _ = Describe("FinBackup Controller Unit Tests", Ordered, func() { + Context("lockVolume", func() { + var reconciler *FinBackupReconciler + var rbdRepo *fake.RBDRepository2 + + lock := func(poolName, imageName, lockID string, rbdErr error) (bool, error) { + rbdRepo.SetError(rbdErr) + defer rbdRepo.SetError(nil) + + locked, err := reconciler.lockVolume(poolName, imageName, lockID) + if err != nil { + return locked, err + } + // Check if the lock with lockID exists + locks, err := rbdRepo.LockLs(poolName, imageName) + if err != nil { + return locked, err + } + lockExists := false + for _, lock := range locks { + if lock.LockID == lockID { + lockExists = true + break + } + } + Expect(locked).To(Equal(lockExists)) + return locked, nil + } + + It("setup", func(ctx SpecContext) { + volumeInfo := &fake.VolumeInfo{ + Namespace: utils.GetUniqueName("ns-"), + PVCName: utils.GetUniqueName("pvc-"), + PVName: utils.GetUniqueName("pv-"), + PoolName: rbdPoolName, + ImageName: rbdImageName, + } + rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName) + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme}) + Expect(err).ToNot(HaveOccurred()) + reconciler = &FinBackupReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + cephClusterNamespace: namespace, + podImage: podImage, + maxPartSize: &defaultMaxPartSize, + snapRepo: rbdRepo, + imageLocker: rbdRepo, + rawImgExpansionUnitSize: 100 * 1 << 20, + } + }) + + It("lock a volume successfully", func() { + locked, err := lock("pool", "image1", "lock1", nil) + Expect(err).NotTo(HaveOccurred()) + Expect(locked).To(BeTrue()) + }) + It("lock a volume that is already locked by the same lockID", func() { + locked, err := lock("pool", "image1", "lock1", nil) + Expect(err).NotTo(HaveOccurred()) + Expect(locked).To(BeTrue()) + }) + It("fail to lock a volume that is already locked by a different lockID", func() { + locked, err := lock("pool", "image1", "lock2", nil) + Expect(err).NotTo(HaveOccurred()) + Expect(locked).To(BeFalse()) + }) + It("lock a different volume successfully", func() { + locked, err := lock("pool", "image2", "lock1", nil) + Expect(err).NotTo(HaveOccurred()) + Expect(locked).To(BeTrue()) + }) + It("error when rbd lock ls fails", func() { + locked, err := lock("pool", "image1", "lock1", errors.New("rbd lock ls error")) + Expect(err).To(HaveOccurred()) + Expect(locked).To(BeFalse()) + }) + }) + + Context("unlockVolume", func() { + var reconciler *FinBackupReconciler + var rbdRepo *fake.RBDRepository2 + + It("setup", func(ctx SpecContext) { + volumeInfo := &fake.VolumeInfo{ + Namespace: utils.GetUniqueName("ns-"), + PVCName: utils.GetUniqueName("pvc-"), + PVName: utils.GetUniqueName("pv-"), + PoolName: rbdPoolName, + ImageName: rbdImageName, + } + rbdRepo = fake.NewRBDRepository2(volumeInfo.PoolName, volumeInfo.ImageName) + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme}) + Expect(err).ToNot(HaveOccurred()) + reconciler = &FinBackupReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + cephClusterNamespace: namespace, + podImage: podImage, + maxPartSize: &defaultMaxPartSize, + snapRepo: rbdRepo, + imageLocker: rbdRepo, + rawImgExpansionUnitSize: 100 * 1 << 20, + } + }) + + It("locks a volume", func(ctx SpecContext) { + locked, err := reconciler.lockVolume("pool", "image", "lock1") + Expect(err).NotTo(HaveOccurred()) + Expect(locked).To(BeTrue()) + }) + + DescribeTable("", func( + ctx SpecContext, + poolName, imageName, lockID string, rbdErr error, + expectErr bool, + ) { + rbdRepo.SetError(rbdErr) + defer rbdRepo.SetError(nil) + + err := reconciler.unlockVolume(poolName, imageName, lockID) + if expectErr { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + } + // Check if the lock is removed + locks, err := rbdRepo.LockLs(poolName, imageName) + if expectErr { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + for _, l := range locks { + Expect(l.LockID).NotTo(Equal(lockID)) + } + } + }, + Entry("unlock a volume successfully", + "pool", "image", "lock1", nil, + false), + Entry("unlock the same volume again (no-op)", + "pool", "image", "lock1", nil, + false), + Entry("error when rbd unlock fails", + "pool", "image", "lock1", errors.New("rbd unlock error"), + true), + ) + }) +}) + // CSATEST-1627 // Description: // diff --git a/internal/infrastructure/ceph/command.go b/internal/infrastructure/ceph/command.go index 312081c5..6048549b 100644 --- a/internal/infrastructure/ceph/command.go +++ b/internal/infrastructure/ceph/command.go @@ -5,8 +5,19 @@ import ( "os/exec" ) -func execute(command string, args ...string) ([]byte, []byte, error) { - cmd := exec.Command(command, args...) +type Command interface { + execute(command ...string) ([]byte, []byte, error) +} + +type commandImpl struct { +} + +func newCommand() Command { + return &commandImpl{} +} + +func (c *commandImpl) execute(command ...string) ([]byte, []byte, error) { + cmd := exec.Command(command[0], command[1:]...) var stdoutBuf, stderrBuf bytes.Buffer cmd.Stdout = &stdoutBuf @@ -15,7 +26,3 @@ func execute(command string, args ...string) ([]byte, []byte, error) { err := cmd.Run() return stdoutBuf.Bytes(), stderrBuf.Bytes(), err } - -func runRBDCommand(args ...string) ([]byte, []byte, error) { - return execute("rbd", args...) -} diff --git a/internal/infrastructure/ceph/command_mock.go b/internal/infrastructure/ceph/command_mock.go new file mode 100644 index 00000000..bdc12fb1 --- /dev/null +++ b/internal/infrastructure/ceph/command_mock.go @@ -0,0 +1,60 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: internal/infrastructure/ceph/command.go +// +// Generated by this command: +// +// mockgen-v0.6.0 -source=internal/infrastructure/ceph/command.go -destination=internal/infrastructure/ceph/command_mock.go -package=ceph +// + +// Package ceph is a generated GoMock package. +package ceph + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockCommand is a mock of Command interface. +type MockCommand struct { + ctrl *gomock.Controller + recorder *MockCommandMockRecorder + isgomock struct{} +} + +// MockCommandMockRecorder is the mock recorder for MockCommand. +type MockCommandMockRecorder struct { + mock *MockCommand +} + +// NewMockCommand creates a new mock instance. +func NewMockCommand(ctrl *gomock.Controller) *MockCommand { + mock := &MockCommand{ctrl: ctrl} + mock.recorder = &MockCommandMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCommand) EXPECT() *MockCommandMockRecorder { + return m.recorder +} + +// execute mocks base method. +func (m *MockCommand) execute(command ...string) ([]byte, []byte, error) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range command { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "execute", varargs...) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].([]byte) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// execute indicates an expected call of execute. +func (mr *MockCommandMockRecorder) execute(command ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "execute", reflect.TypeOf((*MockCommand)(nil).execute), command...) +} diff --git a/internal/infrastructure/ceph/rbd.go b/internal/infrastructure/ceph/rbd.go index e068e58b..7ea11f10 100644 --- a/internal/infrastructure/ceph/rbd.go +++ b/internal/infrastructure/ceph/rbd.go @@ -31,17 +31,20 @@ var ( ) type RBDRepository struct { + command Command } var _ model.RBDRepository = &RBDRepository{} +var _ model.RBDImageLocker = &RBDRepository{} func NewRBDRepository() *RBDRepository { - return &RBDRepository{} + return &RBDRepository{ + command: newCommand(), + } } func (r *RBDRepository) CreateSnapshot(poolName, imageName, snapName string) error { - args := []string{"snap", "create", fmt.Sprintf("%s/%s@%s", poolName, imageName, snapName)} - _, stderr, err := runRBDCommand(args...) + _, stderr, err := r.command.execute("rbd", "snap", "create", fmt.Sprintf("%s/%s@%s", poolName, imageName, snapName)) if err != nil { return fmt.Errorf("failed to create RBD snapshot: %w, stderr: %s", err, string(stderr)) } @@ -50,8 +53,7 @@ func (r *RBDRepository) CreateSnapshot(poolName, imageName, snapName string) err } func (r *RBDRepository) RemoveSnapshot(poolName, imageName, snapName string) error { - args := []string{"snap", "rm", "--force", fmt.Sprintf("%s/%s@%s", poolName, imageName, snapName)} - _, stderr, err := runRBDCommand(args...) + _, stderr, err := r.command.execute("rbd", "snap", "rm", "--force", fmt.Sprintf("%s/%s@%s", poolName, imageName, snapName)) if err != nil { return fmt.Errorf("failed to delete RBD snapshot: %w, stderr: %s", err, string(stderr)) } @@ -60,8 +62,7 @@ func (r *RBDRepository) RemoveSnapshot(poolName, imageName, snapName string) err } func (r *RBDRepository) ListSnapshots(poolName, imageName string) ([]*model.RBDSnapshot, error) { - args := []string{"snap", "ls", "--format", "json", fmt.Sprintf("%s/%s", poolName, imageName)} - stdout, stderr, err := runRBDCommand(args...) + stdout, stderr, err := r.command.execute("rbd", "snap", "ls", "--format", "json", fmt.Sprintf("%s/%s", poolName, imageName)) if err != nil { return nil, fmt.Errorf("failed to list RBD snapshots: %w, stderr: %s", err, string(stderr)) } @@ -75,6 +76,39 @@ func (r *RBDRepository) ListSnapshots(poolName, imageName string) ([]*model.RBDS return snapshots, nil } +func (r *RBDRepository) LockAdd(pool, image, lockID string) error { + _, stderr, err := r.command.execute("rbd", "-p", pool, "lock", "add", image, lockID) + if err != nil { + return fmt.Errorf("failed to add lock to RBD image: %w, stderr: %s", err, string(stderr)) + } + + return nil +} + +func (r *RBDRepository) LockRm(pool, image string, lock *model.RBDLock) error { + _, stderr, err := r.command.execute("rbd", "-p", pool, "lock", "rm", image, lock.LockID, lock.Locker) + if err != nil { + return fmt.Errorf("failed to remove lock from RBD image: %w, stderr: %s", err, string(stderr)) + } + + return nil +} + +func (r *RBDRepository) LockLs(pool, image string) ([]*model.RBDLock, error) { + stdout, stderr, err := r.command.execute("rbd", "-p", pool, "--format", "json", "lock", "ls", image) + if err != nil { + return nil, fmt.Errorf("failed to list locks on RBD image: %w, stderr: %s", err, string(stderr)) + } + + var locks []*model.RBDLock + err = json.Unmarshal(stdout, &locks) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal RBD locks: %w", err) + } + + return locks, nil +} + func (r *RBDRepository) ExportDiff(input *model.ExportDiffInput) (io.ReadCloser, error) { args := []string{ "export-diff", diff --git a/internal/infrastructure/ceph/rbd_test.go b/internal/infrastructure/ceph/rbd_test.go index 70c1a613..507dc98b 100644 --- a/internal/infrastructure/ceph/rbd_test.go +++ b/internal/infrastructure/ceph/rbd_test.go @@ -15,11 +15,13 @@ import ( "github.com/cespare/xxhash/v2" "github.com/cybozu-go/fin/internal/diffgenerator" + "github.com/cybozu-go/fin/internal/model" "github.com/cybozu-go/fin/internal/pkg/csumio" "github.com/cybozu-go/fin/internal/pkg/zeroreader" "github.com/cybozu-go/fin/test/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" "golang.org/x/sys/unix" ) @@ -41,6 +43,121 @@ func TestMain(m *testing.M) { os.Exit(code) } +func mockedRBDRepository(command Command) *RBDRepository { + return &RBDRepository{ + command: command, + } +} + +func TestLockAdd_success(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockCommand(ctrl) + m.EXPECT().execute("rbd", "-p", "pool", "lock", "add", "image", "lockID").Return([]byte{}, []byte{}, nil) + + rbd := mockedRBDRepository(m) + err := rbd.LockAdd("pool", "image", "lockID") + require.NoError(t, err) +} + +func TestLockAdd_failed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockCommand(ctrl) + m.EXPECT().execute(gomock.Any()).Return([]byte{}, []byte{}, fmt.Errorf("error")) + + rbd := mockedRBDRepository(m) + err := rbd.LockAdd("pool", "image", "lockID") + require.Error(t, err) +} + +func TestLockRm_success(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + m := NewMockCommand(ctrl) + m.EXPECT(). + execute("rbd", "-p", "pool", "lock", "rm", "image", "lockID", "client.12345"). + Return([]byte{}, []byte{}, nil) + + rbd := mockedRBDRepository(m) + lock := &model.RBDLock{ + LockID: "lockID", + Locker: "client.12345", + } + err := rbd.LockRm("pool", "image", lock) + require.NoError(t, err) +} + +func TestLockRm_failed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + m := NewMockCommand(ctrl) + m.EXPECT().execute(gomock.Any()).Return([]byte{}, []byte{}, fmt.Errorf("error")) + + rbd := mockedRBDRepository(m) + lock := &model.RBDLock{ + LockID: "lockID", + Locker: "client.12345", + } + err := rbd.LockRm("pool", "image", lock) + require.Error(t, err) +} + +func TestLockLs_many(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockCommand(ctrl) + m.EXPECT(). + execute("rbd", "-p", "pool", "--format", "json", "lock", "ls", "image"). + Return([]byte(` + [ + {"id": "HOGE","locker": "client.12345","address": "192.168.0.1:0/12345"}, + {"id": "FOO","locker": "client.67890","address": "192.168.0.2:0/67890"} + ] + `), []byte{}, nil) + + rbd := mockedRBDRepository(m) + locks, err := rbd.LockLs("pool", "image") + require.NoError(t, err) + require.Len(t, locks, 2) + require.Equal(t, "HOGE", locks[0].LockID) + require.Equal(t, "client.12345", locks[0].Locker) + require.Equal(t, "192.168.0.1:0/12345", locks[0].Address) + require.Equal(t, "FOO", locks[1].LockID) + require.Equal(t, "client.67890", locks[1].Locker) + require.Equal(t, "192.168.0.2:0/67890", locks[1].Address) +} + +func TestLockLs_empty(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockCommand(ctrl) + m.EXPECT(). + execute("rbd", "-p", "pool", "--format", "json", "lock", "ls", "image"). + Return([]byte(`[]`), []byte{}, nil) + + rbd := mockedRBDRepository(m) + locks, err := rbd.LockLs("pool", "image") + require.NoError(t, err) + require.Len(t, locks, 0) +} + +func TestLockLs_failed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockCommand(ctrl) + m.EXPECT().execute(gomock.Any()).Return([]byte{}, []byte{}, fmt.Errorf("error")) + + rbd := mockedRBDRepository(m) + _, err := rbd.LockLs("pool", "image") + require.Error(t, err) +} + func prepareTestdataChecksums() (func(), error) { testDataDir := "testdata" checksumFiles := []struct { diff --git a/internal/infrastructure/fake/rbd2.go b/internal/infrastructure/fake/rbd2.go index 09ab6ff1..b39ae3a5 100644 --- a/internal/infrastructure/fake/rbd2.go +++ b/internal/infrastructure/fake/rbd2.go @@ -24,6 +24,12 @@ type writtenHistory struct { type RBDRepository2 struct { r *rand.ChaCha8 + + // if this is set, all methods return this error + err error + // key: pool/image + locks map[string][]*model.RBDLock + // In this fake, divideSize is used as the basis size for calculating // which areas on the volume have been rewritten by which snapshots or are unused. // The divideSize is a specific idea for fake. It's not related to sector or block size. @@ -36,6 +42,7 @@ type RBDRepository2 struct { var _ model.RBDRepository = &RBDRepository2{} var _ model.RBDSnapshotRepository = &RBDRepository2{} +var _ model.RBDImageLocker = &RBDRepository2{} func NewRBDRepository2(poolName, imageName string) *RBDRepository2 { s := make([]byte, 32) @@ -48,6 +55,7 @@ func NewRBDRepository2(poolName, imageName string) *RBDRepository2 { return &RBDRepository2{ r: rand.NewChaCha8(seed), + locks: make(map[string][]*model.RBDLock), divideSize: 1024, poolName: poolName, imageName: imageName, @@ -57,6 +65,10 @@ func NewRBDRepository2(poolName, imageName string) *RBDRepository2 { } } +func (r *RBDRepository2) SetError(err error) { + r.err = err +} + func (r *RBDRepository2) CreateSnapshot(poolName, imageName, snapName string) error { if poolName != r.poolName || imageName != r.imageName { return errors.New("invalid pool or image") @@ -135,6 +147,61 @@ func (r *RBDRepository2) RemoveSnapshot(poolName, imageName, snapName string) er return nil } +func (r *RBDRepository2) LockAdd(pool, image, lockID string) error { + if r.err != nil { + return r.err + } + + key := pool + "/" + image + if r.locks[key] == nil { + r.locks[key] = []*model.RBDLock{} + } + + if len(r.locks[key]) > 0 { + return fmt.Errorf("lock already exists: %s", lockID) + } + + r.locks[key] = append(r.locks[key], &model.RBDLock{ + LockID: lockID, + Locker: fmt.Sprintf("client:%d", rand.Int64()), // ignore collision for test + Address: fmt.Sprintf("%d,%d,%d,%d:%d/%d", + rand.Int32N(256), rand.Int32N(256), rand.Int32N(256), rand.Int32N(256), + rand.Int32N(65536), rand.Int64()), + }) + + return nil +} + +func (r *RBDRepository2) LockRm(pool, image string, lock *model.RBDLock) error { + if r.err != nil { + return r.err + } + + key := pool + "/" + image + for _, l := range r.locks[key] { + if l.LockID == lock.LockID && l.Locker == lock.Locker { + r.locks[key] = slices.DeleteFunc(r.locks[key], func(lockItem *model.RBDLock) bool { + return lockItem.LockID == lock.LockID && lockItem.Locker == lock.Locker + }) + return nil + } + } + + return fmt.Errorf("lock not found: %s", lock.LockID) +} + +func (r *RBDRepository2) LockLs(pool, image string) ([]*model.RBDLock, error) { + if r.err != nil { + return nil, r.err + } + + key := pool + "/" + image + if locks, ok := r.locks[key]; ok { + return locks, nil + } + return []*model.RBDLock{}, nil +} + func (r *RBDRepository2) ExportDiff(input *model.ExportDiffInput) (io.ReadCloser, error) { if input.PoolName != r.poolName { return nil, fmt.Errorf("pool name mismatch: expected %s, got %s", r.poolName, input.PoolName) diff --git a/internal/infrastructure/fake/rbd2_test.go b/internal/infrastructure/fake/rbd2_test.go index bcae1806..035c95f4 100644 --- a/internal/infrastructure/fake/rbd2_test.go +++ b/internal/infrastructure/fake/rbd2_test.go @@ -98,6 +98,43 @@ type testVolume struct { data []byte } +func TestRBDRepository2_ImageLocker(t *testing.T) { + rbdRepo := NewRBDRepository2(poolName, imageName) + + // checking adding a lock succeeds + err := rbdRepo.LockAdd("pool", "image1", "lock1") + require.NoError(t, err) + + // checking adding the same lock fails + err = rbdRepo.LockAdd("pool", "image1", "lock1") + require.Error(t, err) + + // checking adding a different lock fails + err = rbdRepo.LockAdd("pool", "image1", "lock2") + require.Error(t, err) + + // checking adding another lock succeeds + err = rbdRepo.LockAdd("pool", "image2", "lock1") + require.NoError(t, err) + + // checking listing locks succeeds + locks1, err := rbdRepo.LockLs("pool", "image1") + require.NoError(t, err) + require.Len(t, locks1, 1) + require.Equal(t, "lock1", locks1[0].LockID) + locks2, err := rbdRepo.LockLs("pool", "image2") + require.NoError(t, err) + require.Len(t, locks2, 1) + require.Equal(t, "lock1", locks2[0].LockID) + + // checking removing a lock succeeds + err = rbdRepo.LockRm("pool", "image1", locks1[0]) + require.NoError(t, err) + locks1, err = rbdRepo.LockLs("pool", "image1") + require.NoError(t, err) + require.Empty(t, locks1) +} + func TestRBDRepository2_exportDiff_random(t *testing.T) { rbdRepo := NewRBDRepository2(poolName, imageName) diff --git a/internal/model/repository.go b/internal/model/repository.go index def9decf..f3468376 100644 --- a/internal/model/repository.go +++ b/internal/model/repository.go @@ -77,6 +77,12 @@ type RBDSnapshot struct { Timestamp RBDTimeStamp `json:"timestamp"` } +type RBDLock struct { + LockID string `json:"id"` + Locker string `json:"locker"` + Address string `json:"address"` +} + type ExportDiffInput struct { PoolName string ReadOffset uint64 @@ -107,6 +113,19 @@ type RBDSnapshotRepository interface { RBDSnapshotListRepository } +// RBDImageLocker is an interface for managing locks on RBD images. +// It provides methods to add, remove, and list locks for a given image in a pool. +type RBDImageLocker interface { + // LockAdd adds a lock with the specified lockID to the given image in the pool. + LockAdd(pool, image, lockID string) error + + // LockRm removes the specified lock from the given image in the pool. + LockRm(pool, image string, lock *RBDLock) error + + // LockLs lists all locks for the given image in the pool. + LockLs(pool, image string) ([]*RBDLock, error) +} + // RBDRepository is an interface for managing RBD images and snapshots. // It provides any operations that need knowledge of RBD's internal structure. type RBDRepository interface { diff --git a/test/e2e/Makefile b/test/e2e/Makefile index 35d0bc38..480dd2dc 100644 --- a/test/e2e/Makefile +++ b/test/e2e/Makefile @@ -88,10 +88,10 @@ create-loop-dev: setup-fin-volume: $(MINIKUBE_BIN) $(KUBECTL) for node in $(MINIKUBE_PROFILE_NAME) $(MINIKUBE_PROFILE_NAME)-m02; do \ $(MINIKUBE) ssh -n $$node -- sudo dd if=/dev/zero of=fin-volume.img bs=1G seek=1 count=0; \ - $(MINIKUBE) ssh -n $$node -- sudo losetup /dev/loop1 fin-volume.img || :; \ - $(MINIKUBE) ssh -n $$node -- sudo mkfs.xfs /dev/loop1 || :; \ + $(MINIKUBE) ssh -n $$node -- sudo losetup $(FIN_VOLUME_LOOP_DEV) fin-volume.img || :; \ + $(MINIKUBE) ssh -n $$node -- sudo mkfs.xfs $(FIN_VOLUME_LOOP_DEV) || :; \ $(MINIKUBE) ssh -n $$node -- sudo mkdir -p /fin; \ - $(MINIKUBE) ssh -n $$node -- sudo mount /dev/loop1 /fin; \ + $(MINIKUBE) ssh -n $$node -- sudo mount $(FIN_VOLUME_LOOP_DEV) /fin; \ $(MINIKUBE) ssh -n $$node -- sudo chown -R 10000:10000 /fin; \ $(MINIKUBE) ssh -n $$node -- ls -ld /fin; \ done @@ -149,6 +149,7 @@ setup-components: # Disable webhooks for e2e tests $(KUBECTL) -n $(NS) set env deployment/fin-controller-manager ENABLE_WEBHOOKS=false $(KUBECTL) patch deploy -n $(NS) fin-controller-manager --type='json' -p='[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value":"--raw-img-expansion-unit-size=10485760"}]' + $(KUBECTL) wait -n $(NS) --timeout=60s --for=condition=available deployment/fin-controller-manager .PHONY: do-test do-test: $(GINKGO) $(KUBECTL) $(MINIKUBE_BIN) @@ -157,4 +158,9 @@ do-test: $(GINKGO) $(KUBECTL) $(MINIKUBE_BIN) E2ETEST=1 \ KUBECTL=$(KUBECTL) \ MINIKUBE=$(MINIKUBE) \ - $(GINKGO) --fail-fast -v $(GINKGO_FLAGS) . + $(GINKGO) --fail-fast -v $(GINKGO_FLAGS) .; \ + if [ "$$?" -ne 0 ]; then \ + echo "Controller logs"; \ + $(KUBECTL) -n rook-ceph logs deployment/fin-controller-manager; \ + exit 1; \ + fi diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 7ce0a808..42021c4f 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -74,6 +74,7 @@ var _ = Describe("Fin", func() { Context("wait environment", waitEnvironment) Context("full backup", Label("full-backup"), Ordered, fullBackupTestSuite) Context("incremental backup", Label("incremental-backup"), Ordered, incrementalBackupTestSuite) + Context("lock", Label("lock"), Label("misc"), Ordered, lockTestSuite) Context("verification", Label("verification"), Label("misc"), Ordered, verificationTestSuite) Context("delete incremental backup", Label("delete-incremental-backup"), Label("misc"), Ordered, deleteIncrementalBackupTestSuite) diff --git a/test/e2e/full_backup_test.go b/test/e2e/full_backup_test.go index 8de7edd2..a0c0aa3c 100644 --- a/test/e2e/full_backup_test.go +++ b/test/e2e/full_backup_test.go @@ -195,7 +195,8 @@ func fullBackupTestSuite() { VerifyNonExistenceOfRawImage(pvc, nodes[0]) VerifyDeletionOfJobsForBackup(ctx, k8sClient, finbackup) - VerifyDeletionOfSnapshotInFinBackup(ctx, ctrlClient, finbackup) + err = VerifyDeletionOfSnapshotInFinBackup(ctx, finbackup) + Expect(err).NotTo(HaveOccurred()) }) AfterAll(func(ctx SpecContext) { diff --git a/test/e2e/incremental_test.go b/test/e2e/incremental_test.go index be99f450..5132522b 100644 --- a/test/e2e/incremental_test.go +++ b/test/e2e/incremental_test.go @@ -240,7 +240,8 @@ func incrementalBackupTestSuite() { Expect(rawImageData).To(Equal(dataOnIncrementalBackup), "Data in raw.img does not match the expected data") VerifyDeletionOfJobsForBackup(ctx, k8sClient, finbackup1) - VerifyDeletionOfSnapshotInFinBackup(ctx, ctrlClient, finbackup1) + err = VerifyDeletionOfSnapshotInFinBackup(ctx, finbackup1) + Expect(err).NotTo(HaveOccurred()) }) // Description: diff --git a/test/e2e/lock_test.go b/test/e2e/lock_test.go new file mode 100644 index 00000000..a8454947 --- /dev/null +++ b/test/e2e/lock_test.go @@ -0,0 +1,103 @@ +package e2e + +import ( + "encoding/json" + "time" + + finv1 "github.com/cybozu-go/fin/api/v1" + "github.com/cybozu-go/fin/internal/model" + "github.com/cybozu-go/fin/test/utils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func lockTestSuite() { + var ns *corev1.Namespace + var pvc *corev1.PersistentVolumeClaim + var fb *finv1.FinBackup + dummyLockID := "dummy-lock-id" + var poolName, imageName string + + It("should setup environment", func(ctx SpecContext) { + ns = NewNamespace(utils.GetUniqueName("ns-")) + err := CreateNamespace(ctx, k8sClient, ns) + Expect(err).NotTo(HaveOccurred()) + + pvc = CreateBackupTargetPVC(ctx, k8sClient, ns, "Filesystem", rookStorageClass, "ReadWriteOnce", "100Mi") + // Create a Pod to make filesystem to pass verification during backup + _ = CreatePodForFilesystemPVC(ctx, k8sClient, pvc) + }) + + It("should lock the volume", func(ctx SpecContext) { + // get current pvc to find pool and image name + var err error + pvc, err = k8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(ctx, pvc.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + pv, err := GetPvByPvc(ctx, k8sClient, pvc) + Expect(err).NotTo(HaveOccurred()) + poolName = pv.Spec.CSI.VolumeAttributes["pool"] + imageName = pv.Spec.CSI.VolumeAttributes["imageName"] + + // locked + _, _, err = kubectl("exec", "-n", rookNamespace, "deployment/"+finDeploymentName, "--", + "rbd", "-p", poolName, "lock", "add", imageName, dummyLockID) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should create backup and wait for the log", func(ctx SpecContext) { + var err error + fb, err = NewFinBackup(rookNamespace, utils.GetUniqueName("fb-"), pvc, nodes[0]) + Expect(err).NotTo(HaveOccurred()) + err = CreateFinBackup(ctx, ctrlClient, fb) + Expect(err).NotTo(HaveOccurred()) + + // get current fb to get UID + currentFB := &finv1.FinBackup{} + err = ctrlClient.Get(ctx, client.ObjectKeyFromObject(fb), currentFB) + Expect(err).NotTo(HaveOccurred()) + + err = WaitControllerLog(ctx, + "the volume is locked by another process.*"+string(currentFB.GetUID()), + 3*time.Minute) + Expect(err).NotTo(HaveOccurred()) + }) + + It("checks that the snapshot is not created", func(ctx SpecContext) { + snapshots, err := ListRBDSnapshots(ctx, poolName, imageName) + Expect(err).NotTo(HaveOccurred()) + Expect(snapshots).To(BeEmpty()) + }) + + It("should unlock the volume", func() { + stdout, _, err := kubectl("exec", "-n", rookNamespace, "deployment/"+finDeploymentName, "--", + "rbd", "-p", poolName, "--format", "json", "lock", "ls", imageName) + Expect(err).NotTo(HaveOccurred()) + var locks []*model.RBDLock + err = json.Unmarshal(stdout, &locks) + Expect(err).NotTo(HaveOccurred()) + Expect(locks).To(HaveLen(1)) + + // unlock + _, _, err = kubectl("exec", "-n", rookNamespace, "deployment/"+finDeploymentName, "--", + "rbd", "-p", poolName, "lock", "rm", imageName, dummyLockID, locks[0].Locker) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should resume backup creation and complete it", func(ctx SpecContext) { + _, err := WaitForFinBackupStoredToNodeAndVerified(ctx, ctrlClient, fb, 1*time.Minute) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should not exist locks after backup completion", func() { + stdout, _, err := kubectl("exec", "-n", rookNamespace, "deployment/"+finDeploymentName, "--", + "rbd", "-p", poolName, "--format", "json", "lock", "ls", imageName) + Expect(err).NotTo(HaveOccurred()) + var locks []*model.RBDLock + err = json.Unmarshal(stdout, &locks) + Expect(err).NotTo(HaveOccurred()) + Expect(locks).To(HaveLen(0)) + }) +} diff --git a/test/e2e/util_test.go b/test/e2e/util_test.go index c732efc6..dccfcb60 100644 --- a/test/e2e/util_test.go +++ b/test/e2e/util_test.go @@ -1,16 +1,21 @@ package e2e import ( + "bufio" "bytes" "context" + "encoding/json" "fmt" "html/template" "os" "os/exec" "path/filepath" + "regexp" "time" finv1 "github.com/cybozu-go/fin/api/v1" + "github.com/cybozu-go/fin/internal/controller" + "github.com/cybozu-go/fin/internal/model" "github.com/cybozu-go/fin/test/utils" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -26,6 +31,7 @@ import ( ) const ( + finDeploymentName = "fin-controller-manager" rookNamespace = "rook-ceph" rookStorageClass = "rook-ceph-block" poolName = "rook-ceph-block-pool" @@ -80,7 +86,7 @@ func checkDeploymentReady(namespace, name string) error { func waitEnvironment() { It("wait for fin-controller to be ready", func() { Eventually(func() error { - return checkDeploymentReady(rookNamespace, "fin-controller-manager") + return checkDeploymentReady(rookNamespace, finDeploymentName) }).Should(Succeed()) }) } @@ -372,16 +378,18 @@ func DeleteFinRestore(ctx context.Context, client client.Client, finrestore *fin return client.Delete(ctx, target) } -func WaitForFinBackupStoredToNodeAndVerified(ctx context.Context, c client.Client, finbackup *finv1.FinBackup, timeout time.Duration) error { - return wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) { - fb := &finv1.FinBackup{} - err := c.Get(ctx, client.ObjectKeyFromObject(finbackup), fb) +func WaitForFinBackupStoredToNodeAndVerified(ctx context.Context, c client.Client, finbackup *finv1.FinBackup, timeout time.Duration) (*finv1.FinBackup, error) { + GinkgoHelper() + currentFB := &finv1.FinBackup{} + err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) { + err := c.Get(ctx, client.ObjectKeyFromObject(finbackup), currentFB) if err != nil { return false, err } - return fb.IsStoredToNode() && fb.IsVerifiedTrue(), nil + return currentFB.IsStoredToNode() && currentFB.IsVerifiedTrue(), nil }) + return currentFB, err } func WaitForFinRestoreReady(ctx context.Context, c client.Client, finrestore *finv1.FinRestore, timeout time.Duration) error { @@ -455,6 +463,58 @@ func WaitForPVCDeletion(ctx context.Context, k8sClient kubernetes.Interface, pvc }) } +// WaitControllerLog waits until the controller log matches the given pattern or the duration is exceeded. +func WaitControllerLog(ctx SpecContext, pattern string, duration time.Duration) error { + GinkgoHelper() + + timeoutCtx, cancel := context.WithTimeout(ctx, duration) + defer cancel() + + matcher := regexp.MustCompile(pattern) + + command := exec.CommandContext(timeoutCtx, "kubectl", "logs", "-n", rookNamespace, "deployment/"+finDeploymentName, "-f") + stdoutPipe, err := command.StdoutPipe() + if err != nil { + panic(err) + } + err = command.Start() + if err != nil { + panic(err) + } + defer func() { + _ = command.Process.Kill() + _ = command.Wait() + }() + + // read stdout line by line until the pattern is found + scanner := bufio.NewScanner(stdoutPipe) + found := make(chan struct{}) + go func() { + for scanner.Scan() { + select { + case <-timeoutCtx.Done(): + return + default: + } + line := scanner.Text() + if matcher.MatchString(line) { + close(found) + return + } + } + if scanner.Err() != nil { + panic(scanner.Err()) + } + }() + + select { + case <-timeoutCtx.Done(): + return timeoutCtx.Err() + case <-found: + return nil + } +} + func VerifySizeOfRestorePVC(ctx context.Context, c client.Client, restore *finv1.FinRestore) { GinkgoHelper() @@ -626,8 +686,8 @@ func CreateBackup( pvc, node) Expect(err).NotTo(HaveOccurred()) Expect(CreateFinBackup(ctx, ctrlClient, backup)).NotTo(HaveOccurred()) - Expect(WaitForFinBackupStoredToNodeAndVerified(ctx, ctrlClient, backup, 1*time.Minute)). - NotTo(HaveOccurred()) + backup, err = WaitForFinBackupStoredToNodeAndVerified(ctx, ctrlClient, backup, 1*time.Minute) + Expect(err).NotTo(HaveOccurred()) return backup } @@ -679,6 +739,30 @@ func GetNodeNames(ctx context.Context, k8sClient kubernetes.Interface) ([]string return nodeNames, nil } +func GetPvByPvc(ctx context.Context, k8sClient kubernetes.Interface, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolume, error) { + GinkgoHelper() + + return k8sClient.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{}) +} + +func ListRBDSnapshots(ctx context.Context, poolName, imageName string) ([]*model.RBDSnapshot, error) { + GinkgoHelper() + + stdout, stderr, err := kubectl("exec", "-n", rookNamespace, "deploy/rook-ceph-tools", "--", + "rbd", "-p", poolName, "snap", "ls", imageName, "--format", "json") + if err != nil { + return nil, fmt.Errorf("failed to list RBD snapshots. stdout: %s, stderr: %s, err: %w", + string(stdout), string(stderr), err) + } + + var snapshots []*model.RBDSnapshot + if err := json.Unmarshal(stdout, &snapshots); err != nil { + return nil, fmt.Errorf("failed to unmarshal RBD snapshot list. err: %w", err) + } + + return snapshots, nil +} + func VerifyRawImage(pvc *corev1.PersistentVolumeClaim, node string, expected []byte) { GinkgoHelper() @@ -706,13 +790,26 @@ func VerifyDeletionOfJobsForBackup(ctx context.Context, client kubernetes.Interf Expect(err).NotTo(HaveOccurred(), "Deletion job should be deleted.") } -func VerifyDeletionOfSnapshotInFinBackup(ctx context.Context, ctrlClient client.Client, finbackup *finv1.FinBackup) { +func VerifyDeletionOfSnapshotInFinBackup(ctx context.Context, finbackup *finv1.FinBackup) error { GinkgoHelper() - rbdImage := finbackup.Annotations["fin.cybozu.io/backup-target-rbd-image"] - stdout, stderr, err := kubectl("exec", "-n", rookNamespace, "deploy/rook-ceph-tools", "--", - "rbd", "info", fmt.Sprintf("%s/%s@fin-backup-%s", poolName, rbdImage, finbackup.UID)) - Expect(err).To(HaveOccurred(), "Snapshot should be deleted. stdout: %s, stderr: %s", stdout, stderr) + imageName := finbackup.Annotations[controller.AnnotationBackupTargetRBDImage] + if len(imageName) == 0 { + return fmt.Errorf("finbackup %s/%s does not have %s annotation", + finbackup.Namespace, finbackup.Name, controller.AnnotationBackupTargetRBDImage) + } + snapshots, err := ListRBDSnapshots(ctx, poolName, imageName) + if err != nil { + return err + } + + expectedSnapName := fmt.Sprintf("fin-backup-%s", finbackup.UID) + for _, snapshot := range snapshots { + if snapshot.Name == expectedSnapName { + return fmt.Errorf("snapshot %s still exists", expectedSnapName) + } + } + return nil } func VerifyDeletionOfResourcesForRestore( diff --git a/versions.mk b/versions.mk index d83bcab5..8969ad1e 100644 --- a/versions.mk +++ b/versions.mk @@ -16,3 +16,4 @@ KUSTOMIZE_VERSION ?= v5.7.0 CONTROLLER_TOOLS_VERSION ?= v0.18.0 ENVTEST_VERSION ?= release-0.20 GOLANGCI_LINT_VERSION ?= v2.3.1 +MOCKGEN_VERSION ?= $(shell awk '$$1 == "go.uber.org/mock" {print $$2}' $(SELF_DIR)/go.mod)