Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ clean-test:
rm -f $(TEST_XFS_IMG); \
fi

.PHONY: mock
mock: mockgen
$(MOCKGEN) -source=internal/infrastructure/ceph/command.go -destination=internal/infrastructure/ceph/command_mock.go -package=ceph

.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
test: manifests generate fmt vet envtest mock ## Run tests.
$(MAKE) prepare-test
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" \
TEST_BLOCK_DEV=$(TEST_BLOCK_DEV) \
Expand Down Expand Up @@ -197,7 +201,8 @@ KUBECTL ?= kubectl
KUSTOMIZE ?= $(LOCALBIN)/kustomize-$(KUSTOMIZE_VERSION)
CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION)
ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION)
GOLANGCI_LINT = $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION)
GOLANGCI_LINT ?= $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION)
MOCKGEN ?= $(LOCALBIN)/mockgen-$(MOCKGEN_VERSION)

.PHONY: kustomize
kustomize: $(KUSTOMIZE) ## Download kustomize locally if necessary.
Expand All @@ -217,7 +222,12 @@ $(ENVTEST): $(LOCALBIN)
.PHONY: golangci-lint
golangci-lint: $(GOLANGCI_LINT) ## Download golangci-lint locally if necessary.
$(GOLANGCI_LINT): $(LOCALBIN)
$(call go-install-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/v2/cmd/golangci-lint,${GOLANGCI_LINT_VERSION})
$(call go-install-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/v2/cmd/golangci-lint,$(GOLANGCI_LINT_VERSION))

.PHONY: mockgen
mockgen: $(MOCKGEN) ## Download mockgen locally if necessary.
$(MOCKGEN): $(LOCALBIN)
$(call go-install-tool,$(MOCKGEN),go.uber.org/mock/mockgen,$(MOCKGEN_VERSION))

# go-install-tool will 'go install' any package with custom target and name of binary, if it doesn't exist
# $1 - target path with name of binary (ideally with version)
Expand Down
5 changes: 3 additions & 2 deletions cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func controllerMain(args []string) error {
// Register custom metrics
finmetrics.Register()

snapRepo := ceph.NewRBDRepository()
rbdRepo := ceph.NewRBDRepository()
maxPartSize, err := resource.ParseQuantity(os.Getenv("MAX_PART_SIZE"))
if err != nil {
return fmt.Errorf("failed to parse MAX_PART_SIZE environment variable: %w", err)
Expand All @@ -173,7 +173,8 @@ func controllerMain(args []string) error {
os.Getenv("POD_NAMESPACE"),
os.Getenv("POD_IMAGE"),
&maxPartSize,
snapRepo,
rbdRepo,
rbdRepo,
rawImgExpansionUnitSize,
)
if err = finBackupReconciler.SetupWithManager(mgr); err != nil {
Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ module github.com/cybozu-go/fin
go 1.24

require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.28
github.com/onsi/ginkgo/v2 v2.23.4
github.com/onsi/gomega v1.37.0
github.com/prometheus/client_golang v1.22.0
github.com/spf13/cobra v1.9.1
github.com/stretchr/testify v1.10.0
golang.org/x/sys v0.32.0
go.uber.org/mock v0.6.0
golang.org/x/sys v0.35.0
k8s.io/api v0.32.7
k8s.io/apimachinery v0.32.7
k8s.io/client-go v0.32.7
Expand All @@ -22,7 +24,6 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
Expand Down Expand Up @@ -59,13 +60,13 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/term v0.34.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.31.0 // indirect
golang.org/x/tools v0.36.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
Expand Down
26 changes: 14 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
Expand All @@ -140,34 +142,34 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4=
golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU=
golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
101 changes: 95 additions & 6 deletions internal/controller/finbackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
labelComponentDeletionJob = "deletion-job"

// Annotations
annotationBackupTargetRBDImage = "fin.cybozu.io/backup-target-rbd-image"
AnnotationBackupTargetRBDImage = "fin.cybozu.io/backup-target-rbd-image"
annotationDiffFrom = "fin.cybozu.io/diff-from"
annotationFinBackupName = "fin.cybozu.io/finbackup-name"
annotationFinBackupNamespace = "fin.cybozu.io/finbackup-namespace"
Expand All @@ -68,6 +68,7 @@ const (
var (
errNonRetryableReconcile = errors.New("non retryable reconciliation error; " +
"reconciliation must not keep going nor be retried")
errVolumeLockedByAnother = errors.New("the volume is locked by another process")
)

// FinBackupReconciler reconciles a FinBackup object
Expand All @@ -78,6 +79,7 @@ type FinBackupReconciler struct {
podImage string
maxPartSize *resource.Quantity
snapRepo model.RBDSnapshotRepository
imageLocker model.RBDImageLocker
rawImgExpansionUnitSize uint64
}

Expand All @@ -88,6 +90,7 @@ func NewFinBackupReconciler(
podImage string,
maxPartSize *resource.Quantity,
snapRepo model.RBDSnapshotRepository,
imageLocker model.RBDImageLocker,
rawImgExpansionUnitSize uint64,
) *FinBackupReconciler {
return &FinBackupReconciler{
Expand All @@ -97,6 +100,7 @@ func NewFinBackupReconciler(
podImage: podImage,
maxPartSize: maxPartSize,
snapRepo: snapRepo,
imageLocker: imageLocker,
rawImgExpansionUnitSize: rawImgExpansionUnitSize,
}
}
Expand Down Expand Up @@ -386,7 +390,7 @@ func (r *FinBackupReconciler) createSnapshot(ctx context.Context, backup *finv1.
if annotations == nil {
annotations = map[string]string{}
}
annotations[annotationBackupTargetRBDImage] = rbdImage
annotations[AnnotationBackupTargetRBDImage] = rbdImage
annotations[annotationRBDPool] = rbdPool
backup.SetAnnotations(annotations)

Expand All @@ -396,8 +400,13 @@ func (r *FinBackupReconciler) createSnapshot(ctx context.Context, backup *finv1.
return ctrl.Result{}, err
}

snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup))
snap, err := r.createSnapshotIfNeeded(rbdPool, rbdImage, snapshotName(backup), lockID(backup))
if err != nil {
if errors.Is(err, errVolumeLockedByAnother) {
logger.Info("the volume is locked by another process", "uid", string(backup.GetUID()))
// FIXME: The following "requeue after" is temporary code.
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
logger.Error(err, "failed to create or get snapshot")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -571,12 +580,20 @@ func (r *FinBackupReconciler) reconcileDelete(
return ctrl.Result{}, nil
}

func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName string) (*model.RBDSnapshot, error) {
func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName, lockID string) (*model.RBDSnapshot, error) {
snap, err := findSnapshot(r.snapRepo, rbdPool, rbdImage, snapName)
if err != nil {
if !errors.Is(err, model.ErrNotFound) {
return nil, fmt.Errorf("failed to get snapshot: %w", err)
}

lockSuccess, err := r.lockVolume(rbdPool, rbdImage, lockID)
if err != nil {
return nil, fmt.Errorf("failed to lock image: %w", err)
}
if !lockSuccess {
return nil, errVolumeLockedByAnother
}
err = r.snapRepo.CreateSnapshot(rbdPool, rbdImage, snapName)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot: %w", err)
Expand All @@ -586,6 +603,10 @@ func (r *FinBackupReconciler) createSnapshotIfNeeded(rbdPool, rbdImage, snapName
return nil, fmt.Errorf("failed to get snapshot after creation: %w", err)
}
}
if err := r.unlockVolume(rbdPool, rbdImage, lockID); err != nil {
return nil, fmt.Errorf("failed to unlock image: %w", err)
}

return snap, nil
}

Expand Down Expand Up @@ -613,6 +634,70 @@ func (r *FinBackupReconciler) removeSnapshot(ctx context.Context, backup *finv1.
return nil
}

// lockVolume adds a lock to the specified RBD volume if the lock is not already held.
// It returns true if the lock is held by this caller, false if another lock is held or an error occurs.
func (r *FinBackupReconciler) lockVolume(
poolName, imageName, lockID string,
) (bool, error) {
// Add a lock.
if errAdd := r.imageLocker.LockAdd(poolName, imageName, lockID); errAdd != nil {
locks, errLs := r.imageLocker.LockLs(poolName, imageName)
if errLs != nil {
return false, fmt.Errorf("failed to add a lock and list locks on volume %s/%s: %w", poolName, imageName, errors.Join(errAdd, errLs))
}

switch len(locks) {
case 0:
// It may have been unlocked after the lock failed, but since other causes are also possible, an error is returned.
return false, fmt.Errorf("failed to add a lock to the volume %s/%s: %w", poolName, imageName, errAdd)

case 1:
if locks[0].LockID == lockID {
// Already locked by this FB.
return true, nil
}
// Locked by another process.
return false, nil

default:
// Multiple locks found; unexpected state.
return false, fmt.Errorf("multiple locks found on volume %s/%s after failed lock attempt(%v)", poolName, imageName, locks)
}
}

// Locked
return true, nil
}

// unlockVolume removes the specified lock from the RBD volume if the lock is held.
// No action is taken if the lock is not found.
func (r *FinBackupReconciler) unlockVolume(
poolName, imageName, lockID string,
) error {
// List up locks to check if the lock is held.
locks, err := r.imageLocker.LockLs(poolName, imageName)
if err != nil {
return fmt.Errorf("failed to list locks of the volume %s/%s: %w", poolName, imageName, err)
}

if len(locks) >= 2 {
return fmt.Errorf("multiple locks found on volume %s/%s when unlocking (%v)", poolName, imageName, locks)
}

for _, lock := range locks {
if lock.LockID == lockID {
// Unlock
if err := r.imageLocker.LockRm(poolName, imageName, lock); err != nil {
return fmt.Errorf("failed to remove the lock from the volume %s/%s: %w", poolName, imageName, err)
}
return nil
}
}

// Already unlocked.
return nil
}

func (r *FinBackupReconciler) getRBDPoolAndImageFromPVC(
ctx context.Context,
pvc *corev1.PersistentVolumeClaim,
Expand All @@ -637,7 +722,7 @@ func (r *FinBackupReconciler) getRBDPoolAndImageFromPVC(

func (r *FinBackupReconciler) getRBDPoolAndImage(ctx context.Context, backup *finv1.FinBackup) (string, string, error) {
rbdPool := backup.GetAnnotations()[annotationRBDPool]
rbdImage := backup.GetAnnotations()[annotationBackupTargetRBDImage]
rbdImage := backup.GetAnnotations()[AnnotationBackupTargetRBDImage]
if rbdPool != "" && rbdImage != "" {
return rbdPool, rbdImage, nil
}
Expand Down Expand Up @@ -733,6 +818,10 @@ func cleanupJobName(backup *finv1.FinBackup) string {
return "fin-cleanup-" + string(backup.GetUID())
}

func lockID(backup *finv1.FinBackup) string {
return string(backup.GetUID())
}

func (r *FinBackupReconciler) createOrUpdateBackupJob(
ctx context.Context, backup *finv1.FinBackup, diffFrom string,
backupTargetPVCUID string, maxPartSize *resource.Quantity,
Expand Down Expand Up @@ -794,7 +883,7 @@ func (r *FinBackupReconciler) createOrUpdateBackupJob(
},
{
Name: "RBD_IMAGE_NAME",
Value: backup.GetAnnotations()[annotationBackupTargetRBDImage],
Value: backup.GetAnnotations()[AnnotationBackupTargetRBDImage],
},
{
Name: "BACKUP_SNAPSHOT_ID",
Expand Down
Loading