diff --git a/cmd/snapshot-controller/main.go b/cmd/snapshot-controller/main.go index 72d71b2b..6848d928 100644 --- a/cmd/snapshot-controller/main.go +++ b/cmd/snapshot-controller/main.go @@ -72,6 +72,7 @@ var ( retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.") enableDistributedSnapshotting = flag.Bool("enable-distributed-snapshotting", false, "Enables each node to handle snapshotting for the local volumes created on that node") preventVolumeModeConversion = flag.Bool("prevent-volume-mode-conversion", false, "Prevents an unauthorised user from modifying the volume mode when creating a PVC from an existing VolumeSnapshot.") + enableVolumeGroupSnapshots = flag.Bool("enable-volume-group-snapshots", false, "Enables the volume group snapshot feature, allowing the user to create snapshots of groups of volumes.") retryCRDIntervalMax = flag.Duration("retry-crd-interval-max", 5*time.Second, "Maximum retry interval to wait for CRDs to appear. The default is 5 seconds.") ) @@ -193,14 +194,20 @@ func main() { factory.Snapshot().V1().VolumeSnapshots(), factory.Snapshot().V1().VolumeSnapshotContents(), factory.Snapshot().V1().VolumeSnapshotClasses(), + factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshots(), + factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(), + factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(), coreFactory.Core().V1().PersistentVolumeClaims(), nodeInformer, metricsManager, *resyncPeriod, workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), *enableDistributedSnapshotting, *preventVolumeModeConversion, + *enableVolumeGroupSnapshots, ) if err := ensureCustomResourceDefinitionsExist(snapClient); err != nil { diff --git a/pkg/common-controller/framework_test.go b/pkg/common-controller/framework_test.go index 95bdf0b4..489e66bc 100644 --- a/pkg/common-controller/framework_test.go +++ b/pkg/common-controller/framework_test.go @@ -842,12 +842,18 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte informerFactory.Snapshot().V1().VolumeSnapshots(), informerFactory.Snapshot().V1().VolumeSnapshotContents(), informerFactory.Snapshot().V1().VolumeSnapshotClasses(), + informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshots(), + informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(), + informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(), coreFactory.Core().V1().PersistentVolumeClaims(), nil, metricsManager, 60*time.Second, workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + false, false, false, ) diff --git a/pkg/common-controller/groupsnapshot_controller_helper.go b/pkg/common-controller/groupsnapshot_controller_helper.go new file mode 100644 index 00000000..3bb0433f --- /dev/null +++ b/pkg/common-controller/groupsnapshot_controller_helper.go @@ -0,0 +1,356 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common_controller + +import ( + "context" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + klog "k8s.io/klog/v2" + + crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1" + crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" + "github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils" +) + +func (ctrl *csiSnapshotCommonController) storeGroupSnapshotUpdate(groupsnapshot interface{}) (bool, error) { + return utils.StoreObjectUpdate(ctrl.groupSnapshotStore, groupsnapshot, "groupsnapshot") +} + +func (ctrl *csiSnapshotCommonController) storeGroupSnapshotContentUpdate(groupsnapshotcontent interface{}) (bool, error) { + return utils.StoreObjectUpdate(ctrl.groupSnapshotContentStore, groupsnapshotcontent, "groupsnapshotcontent") +} + +// getGroupSnapshotClass is a helper function to get group snapshot class from the class name. +func (ctrl *csiSnapshotCommonController) getGroupSnapshotClass(className string) (*crdv1alpha1.VolumeGroupSnapshotClass, error) { + klog.V(5).Infof("getGroupSnapshotClass: VolumeGroupSnapshotClassName [%s]", className) + + class, err := ctrl.groupSnapshotClassLister.Get(className) + if err != nil { + klog.Errorf("failed to retrieve group snapshot class %s from the informer: %q", className, err) + return nil, err + } + + return class, nil +} + +// updateGroupSnapshotErrorStatusWithEvent saves new groupsnapshot.Status to API +// server and emits given event on the group snapshot. It saves the status and +// emits the event only when the status has actually changed from the version +// saved in API server. +// +// Parameters: +// +// - groupSnapshot - group snapshot to update +// - setReadyToFalse bool - indicates whether to set the group snapshot's +// ReadyToUse status to false. +// if true, ReadyToUse will be set to false; +// otherwise, ReadyToUse will not be changed. +// - eventtype, reason, message - event to send, see EventRecorder.Event() +func (ctrl *csiSnapshotCommonController) updateGroupSnapshotErrorStatusWithEvent(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, setReadyToFalse bool, eventtype, reason, message string) error { + klog.V(5).Infof("updateGroupSnapshotErrorStatusWithEvent[%s]", utils.GroupSnapshotKey(groupSnapshot)) + + if groupSnapshot.Status != nil && groupSnapshot.Status.Error != nil && *groupSnapshot.Status.Error.Message == message { + klog.V(4).Infof("updateGroupSnapshotErrorStatusWithEvent[%s]: the same error %v is already set", groupSnapshot.Name, groupSnapshot.Status.Error) + return nil + } + groupSnapshotClone := groupSnapshot.DeepCopy() + if groupSnapshotClone.Status == nil { + groupSnapshotClone.Status = &crdv1alpha1.VolumeGroupSnapshotStatus{} + } + statusError := &crdv1.VolumeSnapshotError{ + Time: &metav1.Time{ + Time: time.Now(), + }, + Message: &message, + } + groupSnapshotClone.Status.Error = statusError + // Only update ReadyToUse in VolumeSnapshot's Status to false if setReadyToFalse is true. + if setReadyToFalse { + ready := false + groupSnapshotClone.Status.ReadyToUse = &ready + } + newSnapshot, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).UpdateStatus(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{}) + + // Emit the event even if the status update fails so that user can see the error + ctrl.eventRecorder.Event(newSnapshot, eventtype, reason, message) + + if err != nil { + klog.V(4).Infof("updating VolumeGroupSnapshot[%s] error status failed %v", utils.GroupSnapshotKey(groupSnapshot), err) + return err + } + + _, err = ctrl.storeGroupSnapshotUpdate(newSnapshot) + if err != nil { + klog.V(4).Infof("updating VolumeGroupSnapshot[%s] error status: cannot update internal cache %v", utils.GroupSnapshotKey(groupSnapshot), err) + return err + } + + return nil +} + +// SetDefaultGroupSnapshotClass is a helper function to figure out the default +// group snapshot class. +// For pre-provisioned case, it's an no-op. +// For dynamic provisioning, it gets the default GroupSnapshotClasses in the +// system if there is any (could be multiple), and finds the one with the same +// CSI Driver as a PV from which a group snapshot will be taken. +func (ctrl *csiSnapshotCommonController) SetDefaultGroupSnapshotClass(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshotClass, *crdv1alpha1.VolumeGroupSnapshot, error) { + klog.V(5).Infof("SetDefaultGroupSnapshotClass for group snapshot [%s]", groupSnapshot.Name) + + if groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName != nil { + // don't return error for pre-provisioned group snapshots + klog.V(5).Infof("Don't need to find GroupSnapshotClass for pre-provisioned group snapshot [%s]", groupSnapshot.Name) + return nil, groupSnapshot, nil + } + + // Find default group snapshot class if available + list, err := ctrl.groupSnapshotClassLister.List(labels.Everything()) + if err != nil { + return nil, groupSnapshot, err + } + + pvDriver, err := ctrl.pvDriverFromGroupSnapshot(groupSnapshot) + if err != nil { + klog.Errorf("failed to get pv csi driver from group snapshot %s/%s: %q", groupSnapshot.Namespace, groupSnapshot.Name, err) + return nil, groupSnapshot, err + } + + defaultClasses := []*crdv1alpha1.VolumeGroupSnapshotClass{} + for _, class := range list { + if utils.IsDefaultAnnotation(class.ObjectMeta) && pvDriver == class.Driver { + defaultClasses = append(defaultClasses, class) + klog.V(5).Infof("get defaultGroupClass added: %s, driver: %s", class.Name, pvDriver) + } + } + if len(defaultClasses) == 0 { + return nil, groupSnapshot, fmt.Errorf("cannot find default group snapshot class") + } + if len(defaultClasses) > 1 { + klog.V(4).Infof("get DefaultClass %d defaults found", len(defaultClasses)) + return nil, groupSnapshot, fmt.Errorf("%d default snapshot classes were found", len(defaultClasses)) + } + klog.V(5).Infof("setDefaultSnapshotClass [%s]: default VolumeSnapshotClassName [%s]", groupSnapshot.Name, defaultClasses[0].Name) + groupSnapshotClone := groupSnapshot.DeepCopy() + groupSnapshotClone.Spec.VolumeGroupSnapshotClassName = &(defaultClasses[0].Name) + newGroupSnapshot, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).Update(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{}) + if err != nil { + klog.V(4).Infof("updating VolumeSnapshot[%s] default class failed %v", utils.GroupSnapshotKey(groupSnapshot), err) + } + _, updateErr := ctrl.storeGroupSnapshotUpdate(newGroupSnapshot) + if updateErr != nil { + // We will get an "snapshot update" event soon, this is not a big error + klog.V(4).Infof("setDefaultSnapshotClass [%s]: cannot update internal cache: %v", utils.GroupSnapshotKey(groupSnapshot), updateErr) + } + + return defaultClasses[0], newGroupSnapshot, nil +} + +// pvDriverFromGroupSnapshot is a helper function to get the CSI driver name from the targeted PersistentVolume. +// It looks up the PVC from which the snapshot is specified to be created from, and looks for the PVC's corresponding +// PV. Bi-directional binding will be verified between PVC and PV before the PV's CSI driver is returned. +// For an non-CSI volume, it returns an error immediately as it's not supported. +func (ctrl *csiSnapshotCommonController) pvDriverFromGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (string, error) { + pvs, err := ctrl.getVolumesFromVolumeGroupSnapshot(groupSnapshot) + if err != nil { + return "", err + } + // Take any volume to get the driver + if pvs[0].Spec.PersistentVolumeSource.CSI == nil { + return "", fmt.Errorf("snapshotting non-CSI volumes is not supported, snapshot:%s/%s", groupSnapshot.Namespace, groupSnapshot.Name) + } + return pvs[0].Spec.PersistentVolumeSource.CSI.Driver, nil +} + +// getVolumesFromVolumeGroupSnapshot returns the list of PersistentVolume from a VolumeGroupSnapshot. +func (ctrl *csiSnapshotCommonController) getVolumesFromVolumeGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) ([]*v1.PersistentVolume, error) { + var pvReturnList []*v1.PersistentVolume + pvcs, err := ctrl.getClaimsFromVolumeGroupSnapshot(groupSnapshot) + if err != nil { + return nil, err + } + + for _, pvc := range pvcs { + if pvc.Status.Phase != v1.ClaimBound { + return nil, fmt.Errorf("the PVC %s is not yet bound to a PV, will not attempt to take a group snapshot", pvc.Name) + } + + pvName := pvc.Spec.VolumeName + pv, err := ctrl.client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to retrieve PV %s from the API server: %q", pvName, err) + } + + // Verify binding between PV/PVC is still valid + bound := ctrl.isVolumeBoundToClaim(pv, &pvc) + if bound == false { + klog.Warningf("binding between PV %s and PVC %s is broken", pvName, pvc.Name) + return nil, fmt.Errorf("claim in dataSource not bound or invalid") + } + pvReturnList = append(pvReturnList, pv) + klog.V(5).Infof("getVolumeFromVolumeGroupSnapshot: group snapshot [%s] PV name [%s]", groupSnapshot.Name, pvName) + } + + return pvReturnList, nil +} + +// getClaimsFromVolumeGroupSnapshot is a helper function to get a list of PVCs from VolumeGroupSnapshot. +func (ctrl *csiSnapshotCommonController) getClaimsFromVolumeGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) ([]v1.PersistentVolumeClaim, error) { + labelSelector := groupSnapshot.Spec.Source.Selector + + // Get PVC that has group snapshot label applied. + pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(groupSnapshot.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()}) + if err != nil { + return nil, fmt.Errorf("failed to list PVCs with label selector %s: %q", labelSelector.String(), err) + } + if len(pvcList.Items) == 0 { + return nil, fmt.Errorf("label selector %s for group snapshot not applied to any PVC", labelSelector.String()) + } + return pvcList.Items, nil +} + +// updateGroupSnapshot runs in worker thread and handles "groupsnapshot added", +// "groupsnapshot updated" and "periodic sync" events. +func (ctrl *csiSnapshotCommonController) updateGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) error { + // Store the new group snapshot version in the cache and do not process it + // if this is an old version. + klog.V(5).Infof("updateGroupSnapshot %q", utils.GroupSnapshotKey(groupSnapshot)) + newGroupSnapshot, err := ctrl.storeGroupSnapshotUpdate(groupSnapshot) + if err != nil { + klog.Errorf("%v", err) + } + if !newGroupSnapshot { + return nil + } + + err = ctrl.syncGroupSnapshot(groupSnapshot) + if err != nil { + if errors.IsConflict(err) { + // Version conflict error happens quite often and the controller + // recovers from it easily. + klog.V(3).Infof("could not sync snapshot %q: %+v", utils.GroupSnapshotKey(groupSnapshot), err) + } else { + klog.Errorf("could not sync snapshot %q: %+v", utils.GroupSnapshotKey(groupSnapshot), err) + } + return err + } + return nil +} + +// deleteGroupSnapshot runs in worker thread and handles "groupsnapshot deleted" event. +func (ctrl *csiSnapshotCommonController) deleteGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) { + _ = ctrl.snapshotStore.Delete(groupSnapshot) + klog.V(4).Infof("snapshot %q deleted", utils.GroupSnapshotKey(groupSnapshot)) + + groupSnapshotContentName := "" + if groupSnapshot.Status != nil && groupSnapshot.Status.BoundVolumeGroupSnapshotContentName != nil { + groupSnapshotContentName = *groupSnapshot.Status.BoundVolumeGroupSnapshotContentName + } + if groupSnapshotContentName == "" { + klog.V(5).Infof("deleteGroupSnapshot[%q]: group snapshot content not bound", utils.GroupSnapshotKey(groupSnapshot)) + return + } + + // sync the content when its group snapshot is deleted. Explicitly sync'ing + // the content here in response to group snapshot deletion prevents the content + // from waiting until the next sync period for its release. + klog.V(5).Infof("deleteGroupSnapshot[%q]: scheduling sync of group snapshot content %s", utils.GroupSnapshotKey(groupSnapshot), groupSnapshotContentName) + ctrl.groupSnapshotContentQueue.Add(groupSnapshotContentName) +} + +// syncGroupSnapshot is the main controller method to decide what to do with a +// group snapshot. It's invoked by appropriate cache.Controller callbacks when +// a group snapshot is created, updated or periodically synced. We do not +// differentiate between these events. +// For easier readability, it is split into syncUnreadyGroupSnapshot and syncReadyGroupSnapshot +func (ctrl *csiSnapshotCommonController) syncGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) error { + klog.V(5).Infof("synchronizing VolumeGroupSnapshot[%s]", utils.GroupSnapshotKey(groupSnapshot)) + + klog.V(5).Infof("syncGroupSnapshot [%s]: check if we should remove finalizer on group snapshot PVC source and remove it if we can", utils.GroupSnapshotKey(groupSnapshot)) + + /* + TODO: + - Check and remove finalizer if needed. + - Check and set invalid group snapshot label, if needed. + - Process if deletion timestamp is set. + - Check and add group snapshot finalizers. + */ + + // Need to build or update groupSnapshot.Status in following cases: + // 1) groupSnapshot.Status is nil + // 2) groupSnapshot.Status.ReadyToUse is false + // 3) groupSnapshot.Status.BoundVolumeSnapshotContentName is not set + if !utils.IsGroupSnapshotReady(groupSnapshot) || !utils.IsBoundVolumeGroupSnapshotContentNameSet(groupSnapshot) { + //return ctrl.syncUnreadyGroupSnapshot(groupSnapshot) + } + return ctrl.syncReadyGroupSnapshot(groupSnapshot) +} + +// syncReadyGroupSnapshot checks the group snapshot which has been bound to group +// snapshot content successfully before. +// If there is any problem with the binding (e.g., group snapshot points to a +// non-existent group snapshot content), update the group snapshot status and emit event. +func (ctrl *csiSnapshotCommonController) syncReadyGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) error { + if !utils.IsBoundVolumeGroupSnapshotContentNameSet(groupSnapshot) { + return fmt.Errorf("group snapshot %s is not bound to a group snapshot content", utils.GroupSnapshotKey(groupSnapshot)) + } + content, err := ctrl.getGroupSnapshotContentFromStore(*groupSnapshot.Status.BoundVolumeGroupSnapshotContentName) + if err != nil { + return nil + } + if content == nil { + // this meant there is no matching group snapshot content in cache found + // update status of the group snapshot and return + return ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMissing", "VolumeGroupSnapshotContent is missing") + } + klog.V(5).Infof("syncReadyGroupSnapshot[%s]: VolumeGroupSnapshotContent %q found", utils.GroupSnapshotKey(groupSnapshot), content.Name) + // check binding from group snapshot content side to make sure the binding is still valid + if !utils.IsVolumeGroupSnapshotRefSet(groupSnapshot, content) { + // group snapshot is bound but group snapshot content is not pointing to the group snapshot + return ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotMisbound", "VolumeGroupSnapshotContent is not bound to the VolumeGroupSnapshot correctly") + } + + // everything is verified, return + return nil +} + +// getGroupSnapshotContentFromStore tries to find a VolumeGroupSnapshotContent from group +// snapshot content cache store by name. +// Note that if no VolumeGroupSnapshotContent exists in the cache store and no error +// encountered, it returns (nil, nil) +func (ctrl *csiSnapshotCommonController) getGroupSnapshotContentFromStore(contentName string) (*crdv1alpha1.VolumeGroupSnapshotContent, error) { + obj, exist, err := ctrl.groupSnapshotContentStore.GetByKey(contentName) + if err != nil { + // should never reach here based on implementation at: + // https://github.com/kubernetes/client-go/blob/master/tools/cache/store.go#L226 + return nil, err + } + if !exist { + // not able to find a matching content + return nil, nil + } + content, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent) + if !ok { + return nil, fmt.Errorf("expected VolumeSnapshotContent, got %+v", obj) + } + return content, nil +} diff --git a/pkg/common-controller/snapshot_controller_base.go b/pkg/common-controller/snapshot_controller_base.go index c53fd737..551ec9fb 100644 --- a/pkg/common-controller/snapshot_controller_base.go +++ b/pkg/common-controller/snapshot_controller_base.go @@ -20,10 +20,13 @@ import ( "fmt" "time" + crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1" crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned" - storageinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1" - storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1" + groupsnapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumegroupsnapshot/v1alpha1" + snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1" + groupsnapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumegroupsnapshot/v1alpha1" + snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1" "github.com/kubernetes-csi/external-snapshotter/v6/pkg/metrics" "github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils" @@ -43,25 +46,35 @@ import ( ) type csiSnapshotCommonController struct { - clientset clientset.Interface - client kubernetes.Interface - eventRecorder record.EventRecorder - snapshotQueue workqueue.RateLimitingInterface - contentQueue workqueue.RateLimitingInterface + clientset clientset.Interface + client kubernetes.Interface + eventRecorder record.EventRecorder + snapshotQueue workqueue.RateLimitingInterface + contentQueue workqueue.RateLimitingInterface + groupSnapshotQueue workqueue.RateLimitingInterface + groupSnapshotContentQueue workqueue.RateLimitingInterface - snapshotLister storagelisters.VolumeSnapshotLister - snapshotListerSynced cache.InformerSynced - contentLister storagelisters.VolumeSnapshotContentLister - contentListerSynced cache.InformerSynced - classLister storagelisters.VolumeSnapshotClassLister - classListerSynced cache.InformerSynced - pvcLister corelisters.PersistentVolumeClaimLister - pvcListerSynced cache.InformerSynced - nodeLister corelisters.NodeLister - nodeListerSynced cache.InformerSynced + snapshotLister snapshotlisters.VolumeSnapshotLister + snapshotListerSynced cache.InformerSynced + contentLister snapshotlisters.VolumeSnapshotContentLister + contentListerSynced cache.InformerSynced + classLister snapshotlisters.VolumeSnapshotClassLister + classListerSynced cache.InformerSynced + pvcLister corelisters.PersistentVolumeClaimLister + pvcListerSynced cache.InformerSynced + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + groupSnapshotLister groupsnapshotlisters.VolumeGroupSnapshotLister + groupSnapshotListerSynced cache.InformerSynced + groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister + groupSnapshotContentListerSynced cache.InformerSynced + groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister + groupSnapshotClassListerSynced cache.InformerSynced - snapshotStore cache.Store - contentStore cache.Store + snapshotStore cache.Store + contentStore cache.Store + groupSnapshotStore cache.Store + groupSnapshotContentStore cache.Store metricsManager metrics.MetricsManager @@ -69,23 +82,30 @@ type csiSnapshotCommonController struct { enableDistributedSnapshotting bool preventVolumeModeConversion bool + enableVolumeGroupSnapshots bool } // NewCSISnapshotController returns a new *csiSnapshotCommonController func NewCSISnapshotCommonController( clientset clientset.Interface, client kubernetes.Interface, - volumeSnapshotInformer storageinformers.VolumeSnapshotInformer, - volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer, - volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer, + volumeSnapshotInformer snapshotinformers.VolumeSnapshotInformer, + volumeSnapshotContentInformer snapshotinformers.VolumeSnapshotContentInformer, + volumeSnapshotClassInformer snapshotinformers.VolumeSnapshotClassInformer, + volumeGroupSnapshotInformer groupsnapshotinformers.VolumeGroupSnapshotInformer, + volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer, + volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, nodeInformer coreinformers.NodeInformer, metricsManager metrics.MetricsManager, resyncPeriod time.Duration, snapshotRateLimiter workqueue.RateLimiter, contentRateLimiter workqueue.RateLimiter, + groupSnapshotRateLimiter workqueue.RateLimiter, + groupSnapshotContentRateLimiter workqueue.RateLimiter, enableDistributedSnapshotting bool, preventVolumeModeConversion bool, + enableVolumeGroupSnapshots bool, ) *csiSnapshotCommonController { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -142,12 +162,49 @@ func NewCSISnapshotCommonController( ctrl.preventVolumeModeConversion = preventVolumeModeConversion + ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots + + if enableVolumeGroupSnapshots { + ctrl.groupSnapshotQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotRateLimiter, "snapshot-controller-group-snapshot") + ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "snapshot-controller-group-content") + + volumeGroupSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotWork(obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueGroupSnapshotWork(newObj) }, + DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotWork(obj) }, + }, + ctrl.resyncPeriod, + ) + ctrl.groupSnapshotLister = volumeGroupSnapshotInformer.Lister() + ctrl.groupSnapshotListerSynced = volumeGroupSnapshotInformer.Informer().HasSynced + + volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueGroupSnapshotContentWork(newObj) }, + DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) }, + }, + ctrl.resyncPeriod, + ) + ctrl.groupSnapshotContentLister = volumeGroupSnapshotContentInformer.Lister() + ctrl.groupSnapshotContentListerSynced = volumeGroupSnapshotContentInformer.Informer().HasSynced + + ctrl.groupSnapshotClassLister = volumeGroupSnapshotClassInformer.Lister() + ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotClassInformer.Informer().HasSynced + + } + return ctrl } func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}) { defer ctrl.snapshotQueue.ShutDown() defer ctrl.contentQueue.ShutDown() + if ctrl.enableVolumeGroupSnapshots { + defer ctrl.groupSnapshotQueue.ShutDown() + defer ctrl.groupSnapshotContentQueue.ShutDown() + } klog.Infof("Starting snapshot controller") defer klog.Infof("Shutting snapshot controller") @@ -156,17 +213,24 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{} if ctrl.enableDistributedSnapshotting { informersSynced = append(informersSynced, ctrl.nodeListerSynced) } + if ctrl.enableVolumeGroupSnapshots { + informersSynced = append(informersSynced, []cache.InformerSynced{ctrl.groupSnapshotListerSynced, ctrl.groupSnapshotContentListerSynced, ctrl.groupSnapshotClassListerSynced}...) + } if !cache.WaitForCacheSync(stopCh, informersSynced...) { klog.Errorf("Cannot sync caches") return } - ctrl.initializeCaches(ctrl.snapshotLister, ctrl.contentLister) + ctrl.initializeCaches() for i := 0; i < workers; i++ { go wait.Until(ctrl.snapshotWorker, 0, stopCh) go wait.Until(ctrl.contentWorker, 0, stopCh) + if ctrl.enableVolumeGroupSnapshots { + go wait.Until(ctrl.groupSnapshotWorker, 0, stopCh) + go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh) + } } <-stopCh @@ -481,8 +545,8 @@ func (ctrl *csiSnapshotCommonController) deleteContent(content *crdv1.VolumeSnap // initializeCaches fills all controller caches with initial data from etcd in // order to have the caches already filled when first addSnapshot/addContent to // perform initial synchronization of the controller. -func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storagelisters.VolumeSnapshotLister, contentLister storagelisters.VolumeSnapshotContentLister) { - snapshotList, err := snapshotLister.List(labels.Everything()) +func (ctrl *csiSnapshotCommonController) initializeCaches() { + snapshotList, err := ctrl.snapshotLister.List(labels.Everything()) if err != nil { klog.Errorf("CSISnapshotController can't initialize caches: %v", err) return @@ -494,7 +558,7 @@ func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storage } } - contentList, err := contentLister.List(labels.Everything()) + contentList, err := ctrl.contentLister.List(labels.Everything()) if err != nil { klog.Errorf("CSISnapshotController can't initialize caches: %v", err) return @@ -506,5 +570,195 @@ func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storage } } + if ctrl.enableVolumeGroupSnapshots { + groupSnapshotList, err := ctrl.snapshotLister.List(labels.Everything()) + if err != nil { + klog.Errorf("CSISnapshotController can't initialize caches: %v", err) + return + } + for _, groupSnapshot := range groupSnapshotList { + groupSnapshotClone := groupSnapshot.DeepCopy() + if _, err = ctrl.storeGroupSnapshotUpdate(groupSnapshotClone); err != nil { + klog.Errorf("error updating volume group snapshot cache: %v", err) + } + } + + groupContentList, err := ctrl.contentLister.List(labels.Everything()) + if err != nil { + klog.Errorf("CSISnapshotController can't initialize caches: %v", err) + return + } + for _, groupContent := range groupContentList { + groupContentClone := groupContent.DeepCopy() + if _, err = ctrl.storeGroupSnapshotContentUpdate(groupContentClone); err != nil { + klog.Errorf("error updating volume group snapshot content cache: %v", err) + } + } + } + klog.V(4).Infof("controller initialized") } + +// enqueueGroupSnapshotWork adds group snapshot to given work queue. +func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotWork(obj interface{}) { + // Beware of "xxx deleted" events + if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { + obj = unknown.Obj + } + if groupSnapshot, ok := obj.(*crdv1alpha1.VolumeGroupSnapshot); ok { + objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(groupSnapshot) + if err != nil { + klog.Errorf("failed to get key from object: %v, %v", err, groupSnapshot) + return + } + klog.V(5).Infof("enqueued %q for sync", objName) + ctrl.groupSnapshotQueue.Add(objName) + } +} + +// enqueueGroupSnapshotContentWork adds group snapshot content to given work queue. +func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj interface{}) { + // Beware of "xxx deleted" events + if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { + obj = unknown.Obj + } + if content, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent); ok { + objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(content) + if err != nil { + klog.Errorf("failed to get key from object: %v, %v", err, content) + return + } + klog.V(5).Infof("enqueued %q for sync", objName) + ctrl.groupSnapshotContentQueue.Add(objName) + } +} + +// groupSnapshotWorker is the main worker for VolumeGroupSnapshots. +func (ctrl *csiSnapshotCommonController) groupSnapshotWorker() { + keyObj, quit := ctrl.groupSnapshotQueue.Get() + if quit { + return + } + defer ctrl.groupSnapshotQueue.Done(keyObj) + + if err := ctrl.syncGroupSnapshotByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + ctrl.groupSnapshotQueue.AddRateLimited(keyObj) + klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", keyObj.(string), err) + } else { + // Finally, if no error occurs we forget this item so it does not + // get queued again until another change happens. + ctrl.groupSnapshotQueue.Forget(keyObj) + } +} + +// groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent. +func (ctrl *csiSnapshotCommonController) groupSnapshotContentWorker() { + keyObj, quit := ctrl.groupSnapshotContentQueue.Get() + if quit { + return + } + defer ctrl.groupSnapshotContentQueue.Done(keyObj) + + if err := ctrl.syncContentByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj) + klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err) + } else { + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + ctrl.groupSnapshotContentQueue.Forget(keyObj) + } +} + +// syncGroupSnapshotByKey processes a VolumeGroupSnapshot request. +func (ctrl *csiSnapshotCommonController) syncGroupSnapshotByKey(key string) error { + klog.V(5).Infof("syncGroupSnapshotByKey[%s]", key) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + klog.V(5).Infof("groupSnapshotWorker: group snapshot namespace [%s] name [%s]", namespace, name) + if err != nil { + klog.Errorf("error getting namespace & name of group snapshot %q to get group snapshot from informer: %v", key, err) + return nil + } + groupSnapshot, err := ctrl.groupSnapshotLister.VolumeGroupSnapshots(namespace).Get(name) + if err == nil { + // The volume group snapshot still exists in informer cache, the event must have + // been add/update/sync + newGroupSnapshot, err := ctrl.checkAndUpdateGroupSnapshotClass(groupSnapshot) + if err == nil || (newGroupSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err)) { + // If the VolumeSnapshotClass is not found, we still need to process an update + // so that syncGroupSnapshot can delete the snapshot, should it still exist in the + // cluster after it's been removed from the informer cache + if newGroupSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err) { + klog.V(5).Infof("GroupSnapshot %q is being deleted. GroupSnapshotClass has already been removed", key) + } + klog.V(5).Infof("Updating group snapshot %q", key) + return ctrl.updateGroupSnapshot(newGroupSnapshot) + } + return err + } + if err != nil && !errors.IsNotFound(err) { + klog.V(2).Infof("error getting group snapshot %q from informer: %v", key, err) + return err + } + // The group snapshot is not in informer cache, the event must have been "delete" + vgsObj, found, err := ctrl.groupSnapshotStore.GetByKey(key) + if err != nil { + klog.V(2).Infof("error getting group snapshot %q from cache: %v", key, err) + return nil + } + if !found { + // The controller has already processed the delete event and + // deleted the group snapshot from its cache + klog.V(2).Infof("deletion of group snapshot %q was already processed", key) + return nil + } + groupSnapshot, ok := vgsObj.(*crdv1alpha1.VolumeGroupSnapshot) + if !ok { + klog.Errorf("expected vgs, got %+v", vgsObj) + return nil + } + + klog.V(5).Infof("deleting group snapshot %q", key) + ctrl.deleteGroupSnapshot(groupSnapshot) + + return nil +} + +// checkAndUpdateGroupSnapshotClass gets the VolumeGroupSnapshotClass from VolumeGroupSnapshot. +// If it is not set, gets it from default VolumeGroupSnapshotClass and sets it. +// On error, it must return the original group snapshot, not nil, because the caller +// syncGroupSnapshotByKey needs to check group snapshot's timestamp. +func (ctrl *csiSnapshotCommonController) checkAndUpdateGroupSnapshotClass(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshot, error) { + className := groupSnapshot.Spec.VolumeGroupSnapshotClassName + var class *crdv1alpha1.VolumeGroupSnapshotClass + var err error + newGroupSnapshot := groupSnapshot + if className != nil { + klog.V(5).Infof("checkAndUpdateGroupSnapshotClass [%s]: VolumeGroupSnapshotClassName [%s]", groupSnapshot.Name, *className) + class, err = ctrl.getGroupSnapshotClass(*className) + if err != nil { + klog.Errorf("checkAndUpdateGroupSnapshotClass failed to getGroupSnapshotClass %v", err) + ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "GetGroupSnapshotClassFailed", fmt.Sprintf("failed to get group snapshot class with error %v", err)) + // we need to return the original group snapshot even if the class isn't found, as it may need to be deleted + return newGroupSnapshot, err + } + } else { + klog.V(5).Infof("checkAndUpdateGroupSnapshotClass [%s]: SetDefaultGroupSnapshotClass", groupSnapshot.Name) + class, newGroupSnapshot, err = ctrl.SetDefaultGroupSnapshotClass(groupSnapshot) + if err != nil { + klog.Errorf("checkAndUpdateGroupSnapshotClass failed to setDefaultClass %v", err) + ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "SetDefaultGroupSnapshotClassFailed", fmt.Sprintf("Failed to set default group snapshot class with error %v", err)) + return groupSnapshot, err + } + } + + // For pre-provisioned group snapshots, we may not have group snapshot class + if class != nil { + klog.V(5).Infof("VolumeGroupSnapshotClass [%s] Driver [%s]", class.Name, class.Driver) + } + return newGroupSnapshot, nil +} diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 7f9d280d..2e67e20c 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -24,7 +24,6 @@ import ( "strings" "time" - crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +32,9 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" klog "k8s.io/klog/v2" + + crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1" + crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" ) var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc @@ -475,3 +477,34 @@ func IsSnapshotReady(snapshot *crdv1.VolumeSnapshot) bool { func IsSnapshotCreated(snapshot *crdv1.VolumeSnapshot) bool { return snapshot.Status != nil && snapshot.Status.CreationTime != nil } + +func GroupSnapshotKey(vgs *crdv1alpha1.VolumeGroupSnapshot) string { + return fmt.Sprintf("%s/%s", vgs.Namespace, vgs.Name) +} + +func GroupSnapshotRefKey(vgsref *v1.ObjectReference) string { + return fmt.Sprintf("%s/%s", vgsref.Namespace, vgsref.Name) +} + +func IsGroupSnapshotReady(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) bool { + if groupSnapshot.Status == nil || groupSnapshot.Status.ReadyToUse == nil || *groupSnapshot.Status.ReadyToUse == false { + return false + } + return true +} + +func IsBoundVolumeGroupSnapshotContentNameSet(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) bool { + if groupSnapshot.Status == nil || groupSnapshot.Status.BoundVolumeGroupSnapshotContentName == nil || *groupSnapshot.Status.BoundVolumeGroupSnapshotContentName == "" { + return false + } + return true +} + +func IsVolumeGroupSnapshotRefSet(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, content *crdv1alpha1.VolumeGroupSnapshotContent) bool { + if content.Spec.VolumeGroupSnapshotRef.Name == groupSnapshot.Name && + content.Spec.VolumeGroupSnapshotRef.Namespace == groupSnapshot.Namespace && + content.Spec.VolumeGroupSnapshotRef.UID == groupSnapshot.UID { + return true + } + return false +}