diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index 23c62b5d..cd094c34 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -79,12 +79,13 @@ var ( kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") - metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") - retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.") - retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.") - enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.") + metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") + httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") + metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.") + enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.") + enableVolumeGroupSnapshots = flag.Bool("enable-volume-group-snapshots", false, "Enables the volume group snapshot feature, allowing the user to create snapshots of groups of volumes.") ) var ( @@ -235,6 +236,10 @@ func main() { *snapshotNameUUIDLength, *extraCreateMetadata, workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + *enableVolumeGroupSnapshots, + snapshotContentfactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(), + snapshotContentfactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(), + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), ) run := func(context.Context) { diff --git a/pkg/sidecar-controller/framework_test.go b/pkg/sidecar-controller/framework_test.go index 79f17ed0..568c573f 100644 --- a/pkg/sidecar-controller/framework_test.go +++ b/pkg/sidecar-controller/framework_test.go @@ -572,6 +572,10 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte -1, true, workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + false, + informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(), + informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(), + workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), ) ctrl.eventRecorder = record.NewFakeRecorder(1000) diff --git a/pkg/sidecar-controller/groupsnapshot_helper.go b/pkg/sidecar-controller/groupsnapshot_helper.go new file mode 100644 index 00000000..37940586 --- /dev/null +++ b/pkg/sidecar-controller/groupsnapshot_helper.go @@ -0,0 +1,170 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sidecar_controller + +import ( + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/cache" + klog "k8s.io/klog/v2" + + crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1" + "github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils" +) + +func (ctrl *csiSnapshotSideCarController) storeGroupSnapshotContentUpdate(content interface{}) (bool, error) { + return utils.StoreObjectUpdate(ctrl.groupSnapshotContentStore, content, "groupsnapshotcontent") +} + +// enqueueGroupSnapshotContentWork adds group snapshot content to given work queue. +func (ctrl *csiSnapshotSideCarController) enqueueGroupSnapshotContentWork(obj interface{}) { + // Beware of "xxx deleted" events + if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { + obj = unknown.Obj + } + if content, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent); ok { + objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(content) + if err != nil { + klog.Errorf("failed to get key from object: %v, %v", err, content) + return + } + klog.V(5).Infof("enqueued %q for sync", objName) + ctrl.groupSnapshotContentQueue.Add(objName) + } +} + +// groupSnapshotContentWorker processes items from groupSnapshotContentQueue. +// It must run only once, syncContent is not assured to be reentrant. +func (ctrl *csiSnapshotSideCarController) groupSnapshotContentWorker() { + keyObj, quit := ctrl.groupSnapshotContentQueue.Get() + if quit { + return + } + defer ctrl.groupSnapshotContentQueue.Done(keyObj) + + if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj) + klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", keyObj.(string), err) + return + } + + // Finally, if no error occurs we forget this item so it does not + // get queued again until another change happens. + ctrl.groupSnapshotContentQueue.Forget(keyObj) + return +} + +func (ctrl *csiSnapshotSideCarController) syncGroupSnapshotContentByKey(key string) error { + klog.V(5).Infof("syncGroupSnapshotContentByKey[%s]", key) + + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + klog.V(4).Infof("error getting name of groupSnapshotContent %q from informer: %v", key, err) + return nil + } + content, err := ctrl.groupSnapshotContentLister.Get(name) + // The group snapshot content still exists in informer cache, the event must + // have been add/update/sync + if err == nil { + if ctrl.isDriverMatch(content) { + err = ctrl.updateGroupSnapshotContentInInformerCache(content) + } + if err != nil { + // If error occurs we add this item back to the queue + return err + } + return nil + } + if !errors.IsNotFound(err) { + klog.V(2).Infof("error getting group snapshot content %q from informer: %v", key, err) + return nil + } + + // The content is not in informer cache, the event must have been + // "delete" + contentObj, found, err := ctrl.groupSnapshotContentStore.GetByKey(key) + if err != nil { + klog.V(2).Infof("error getting group snapshot content %q from cache: %v", key, err) + return nil + } + if !found { + // The controller has already processed the delete event and + // deleted the group snapshot content from its cache + klog.V(2).Infof("deletion of group snapshot content %q was already processed", key) + return nil + } + content, ok := contentObj.(*crdv1alpha1.VolumeGroupSnapshotContent) + if !ok { + klog.Errorf("expected group snapshot content, got %+v", content) + return nil + } + ctrl.deleteGroupSnapshotContentInCacheStore(content) + return nil +} + +// updateGroupSnapshotContentInInformerCache runs in worker thread and handles +// "group snapshot content added", "group snapshot content updated" and "periodic +// sync" events. +func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentInInformerCache(content *crdv1alpha1.VolumeGroupSnapshotContent) error { + // Store the new group snapshot content version in the cache and do not process + // it if this is an old version. + new, err := ctrl.storeGroupSnapshotContentUpdate(content) + if err != nil { + klog.Errorf("%v", err) + } + if !new { + return nil + } + err = ctrl.syncGroupSnapshotContent(content) + if err != nil { + if errors.IsConflict(err) { + // Version conflict error happens quite often and the controller + // recovers from it easily. + klog.V(3).Infof("could not sync group snapshot content %q: %+v", content.Name, err) + } else { + klog.Errorf("could not sync group snapshot content %q: %+v", content.Name, err) + } + return err + } + return nil +} + +// deleteGroupSnapshotContentInCacheStore runs in worker thread and handles "group +// snapshot content deleted" event. +func (ctrl *csiSnapshotSideCarController) deleteGroupSnapshotContentInCacheStore(content *crdv1alpha1.VolumeGroupSnapshotContent) { + _ = ctrl.groupSnapshotContentStore.Delete(content) + klog.V(4).Infof("group snapshot content %q deleted", content.Name) +} + +// syncGroupSnapshotContent deals with one key off the queue. It returns false when it's time to quit. +func (ctrl *csiSnapshotSideCarController) syncGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) error { + klog.V(5).Infof("synchronizing VolumeGroupSnapshotContent[%s]", content.Name) + + /* + TODO: Check if the group snapshot content should be deleted + */ + + /* + TODO: Check if a new group snapshot should be created by calling CreateGroupSnapshot + */ + + /* + TODO: Check and update group snapshot content status + */ + return nil +} diff --git a/pkg/sidecar-controller/snapshot_controller_base.go b/pkg/sidecar-controller/snapshot_controller_base.go index 60b30e7a..148c13d0 100644 --- a/pkg/sidecar-controller/snapshot_controller_base.go +++ b/pkg/sidecar-controller/snapshot_controller_base.go @@ -20,13 +20,6 @@ import ( "fmt" "time" - crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" - clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned" - storageinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1" - storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1" - "github.com/kubernetes-csi/external-snapshotter/v6/pkg/snapshotter" - "github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -38,6 +31,16 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" klog "k8s.io/klog/v2" + + crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1" + crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" + clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned" + groupsnapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumegroupsnapshot/v1alpha1" + snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1" + groupsnapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumegroupsnapshot/v1alpha1" + snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1" + "github.com/kubernetes-csi/external-snapshotter/v6/pkg/snapshotter" + "github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils" ) type csiSnapshotSideCarController struct { @@ -48,9 +51,9 @@ type csiSnapshotSideCarController struct { contentQueue workqueue.RateLimitingInterface extraCreateMetadata bool - contentLister storagelisters.VolumeSnapshotContentLister + contentLister snapshotlisters.VolumeSnapshotContentLister contentListerSynced cache.InformerSynced - classLister storagelisters.VolumeSnapshotClassLister + classLister snapshotlisters.VolumeSnapshotClassLister classListerSynced cache.InformerSynced contentStore cache.Store @@ -58,6 +61,14 @@ type csiSnapshotSideCarController struct { handler Handler resyncPeriod time.Duration + + enableVolumeGroupSnapshots bool + groupSnapshotContentQueue workqueue.RateLimitingInterface + groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister + groupSnapshotContentListerSynced cache.InformerSynced + groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister + groupSnapshotClassListerSynced cache.InformerSynced + groupSnapshotContentStore cache.Store } // NewCSISnapshotSideCarController returns a new *csiSnapshotSideCarController @@ -65,8 +76,8 @@ func NewCSISnapshotSideCarController( clientset clientset.Interface, client kubernetes.Interface, driverName string, - volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer, - volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer, + volumeSnapshotContentInformer snapshotinformers.VolumeSnapshotContentInformer, + volumeSnapshotClassInformer snapshotinformers.VolumeSnapshotClassInformer, snapshotter snapshotter.Snapshotter, timeout time.Duration, resyncPeriod time.Duration, @@ -74,6 +85,10 @@ func NewCSISnapshotSideCarController( snapshotNameUUIDLength int, extraCreateMetadata bool, contentRateLimiter workqueue.RateLimiter, + enableVolumeGroupSnapshots bool, + volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer, + volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer, + groupSnapshotContentRateLimiter workqueue.RateLimiter, ) *csiSnapshotSideCarController { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -124,6 +139,33 @@ func NewCSISnapshotSideCarController( ctrl.classLister = volumeSnapshotClassInformer.Lister() ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced + ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots + if enableVolumeGroupSnapshots { + ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "csi-snapshotter-groupsnapshotcontent") + + volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { + /* + TODO: Determine if we need to skip requeueing in case of CSI driver failure. + */ + ctrl.enqueueGroupSnapshotContentWork(newObj) + }, + DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) }, + }, + ctrl.resyncPeriod, + ) + + ctrl.groupSnapshotContentLister = volumeGroupSnapshotContentInformer.Lister() + ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotContentInformer.Informer().HasSynced + + ctrl.groupSnapshotClassLister = volumeGroupSnapshotClassInformer.Lister() + ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotClassInformer.Informer().HasSynced + + } + return ctrl } @@ -133,15 +175,23 @@ func (ctrl *csiSnapshotSideCarController) Run(workers int, stopCh <-chan struct{ klog.Infof("Starting CSI snapshotter") defer klog.Infof("Shutting CSI snapshotter") - if !cache.WaitForCacheSync(stopCh, ctrl.contentListerSynced, ctrl.classListerSynced) { + informersSynced := []cache.InformerSynced{ctrl.contentListerSynced, ctrl.classListerSynced} + if ctrl.enableVolumeGroupSnapshots { + informersSynced = append(informersSynced, []cache.InformerSynced{ctrl.groupSnapshotContentListerSynced, ctrl.groupSnapshotClassListerSynced}...) + } + + if !cache.WaitForCacheSync(stopCh, informersSynced...) { klog.Errorf("Cannot sync caches") return } - ctrl.initializeCaches(ctrl.contentLister) + ctrl.initializeCaches() for i := 0; i < workers; i++ { go wait.Until(ctrl.contentWorker, 0, stopCh) + if ctrl.enableVolumeGroupSnapshots { + go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh) + } } <-stopCh @@ -240,25 +290,48 @@ func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error { return nil } -// verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name -func (ctrl *csiSnapshotSideCarController) isDriverMatch(content *crdv1.VolumeSnapshotContent) bool { - if content.Spec.Source.VolumeHandle == nil && content.Spec.Source.SnapshotHandle == nil { - // Skip this snapshot content if it does not have a valid source - return false - } - if content.Spec.Driver != ctrl.driverName { - // Skip this snapshot content if the driver does not match - return false - } - snapshotClassName := content.Spec.VolumeSnapshotClassName - if snapshotClassName != nil { - if snapshotClass, err := ctrl.classLister.Get(*snapshotClassName); err == nil { - if snapshotClass.Driver != ctrl.driverName { - return false +// isDriverMatch verifies whether the driver specified in VolumeSnapshotContent +// or VolumeGroupSnapshotContent matches the controller's driver name +func (ctrl *csiSnapshotSideCarController) isDriverMatch(object interface{}) bool { + if content, ok := object.(*crdv1.VolumeSnapshotContent); ok { + if content.Spec.Source.VolumeHandle == nil && content.Spec.Source.SnapshotHandle == nil { + // Skip this snapshot content if it does not have a valid source + return false + } + if content.Spec.Driver != ctrl.driverName { + // Skip this snapshot content if the driver does not match + return false + } + snapshotClassName := content.Spec.VolumeSnapshotClassName + if snapshotClassName != nil { + if snapshotClass, err := ctrl.classLister.Get(*snapshotClassName); err == nil { + if snapshotClass.Driver != ctrl.driverName { + return false + } } } + return true } - return true + if content, ok := object.(*crdv1alpha1.VolumeGroupSnapshotContent); ok { + if content.Spec.Source.VolumeGroupSnapshotHandle == nil && len(content.Spec.Source.PersistentVolumeNames) == 0 { + // Skip this group snapshot content if it does not have a valid source + return false + } + if content.Spec.Driver != ctrl.driverName { + // Skip this group snapshot content if the driver does not match + return false + } + groupSnapshotClassName := content.Spec.VolumeGroupSnapshotClassName + if groupSnapshotClassName != nil { + if groupSnapshotClass, err := ctrl.groupSnapshotClassLister.Get(*groupSnapshotClassName); err == nil { + if groupSnapshotClass.Driver != ctrl.driverName { + return false + } + } + } + return true + } + return false } // updateContentInInformerCache runs in worker thread and handles "content added", @@ -296,8 +369,8 @@ func (ctrl *csiSnapshotSideCarController) deleteContentInCacheStore(content *crd // initializeCaches fills all controller caches with initial data from etcd in // order to have the caches already filled when first addSnapshot/addContent to // perform initial synchronization of the controller. -func (ctrl *csiSnapshotSideCarController) initializeCaches(contentLister storagelisters.VolumeSnapshotContentLister) { - contentList, err := contentLister.List(labels.Everything()) +func (ctrl *csiSnapshotSideCarController) initializeCaches() { + contentList, err := ctrl.contentLister.List(labels.Everything()) if err != nil { klog.Errorf("CSISnapshotController can't initialize caches: %v", err) return @@ -311,5 +384,19 @@ func (ctrl *csiSnapshotSideCarController) initializeCaches(contentLister storage } } + if ctrl.enableVolumeGroupSnapshots { + groupSnapshotContentList, err := ctrl.groupSnapshotContentLister.List(labels.Everything()) + if err != nil { + klog.Errorf("CSISnapshotController can't initialize caches: %v", err) + return + } + for _, groupSnapshotContent := range groupSnapshotContentList { + groupSnapshotContentClone := groupSnapshotContent.DeepCopy() + if _, err = ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContentClone); err != nil { + klog.Errorf("error updating volume group snapshot content cache: %v", err) + } + } + } + klog.V(4).Infof("controller initialized") }