-Introduce volume group snapshot functionality include initializing queues and caches in snapshotter
- Introduce new flag for volume group snapshots and run worker if flag is enabled - Introduce the main controller for group snapshots in snapshot-controller
This commit is contained in:
@@ -20,10 +20,13 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
|
||||
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
|
||||
clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned"
|
||||
storageinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1"
|
||||
storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1"
|
||||
groupsnapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumegroupsnapshot/v1alpha1"
|
||||
snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1"
|
||||
groupsnapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumegroupsnapshot/v1alpha1"
|
||||
snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1"
|
||||
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/metrics"
|
||||
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils"
|
||||
|
||||
@@ -43,25 +46,35 @@ import (
|
||||
)
|
||||
|
||||
type csiSnapshotCommonController struct {
|
||||
clientset clientset.Interface
|
||||
client kubernetes.Interface
|
||||
eventRecorder record.EventRecorder
|
||||
snapshotQueue workqueue.RateLimitingInterface
|
||||
contentQueue workqueue.RateLimitingInterface
|
||||
clientset clientset.Interface
|
||||
client kubernetes.Interface
|
||||
eventRecorder record.EventRecorder
|
||||
snapshotQueue workqueue.RateLimitingInterface
|
||||
contentQueue workqueue.RateLimitingInterface
|
||||
groupSnapshotQueue workqueue.RateLimitingInterface
|
||||
groupSnapshotContentQueue workqueue.RateLimitingInterface
|
||||
|
||||
snapshotLister storagelisters.VolumeSnapshotLister
|
||||
snapshotListerSynced cache.InformerSynced
|
||||
contentLister storagelisters.VolumeSnapshotContentLister
|
||||
contentListerSynced cache.InformerSynced
|
||||
classLister storagelisters.VolumeSnapshotClassLister
|
||||
classListerSynced cache.InformerSynced
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvcListerSynced cache.InformerSynced
|
||||
nodeLister corelisters.NodeLister
|
||||
nodeListerSynced cache.InformerSynced
|
||||
snapshotLister snapshotlisters.VolumeSnapshotLister
|
||||
snapshotListerSynced cache.InformerSynced
|
||||
contentLister snapshotlisters.VolumeSnapshotContentLister
|
||||
contentListerSynced cache.InformerSynced
|
||||
classLister snapshotlisters.VolumeSnapshotClassLister
|
||||
classListerSynced cache.InformerSynced
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvcListerSynced cache.InformerSynced
|
||||
nodeLister corelisters.NodeLister
|
||||
nodeListerSynced cache.InformerSynced
|
||||
groupSnapshotLister groupsnapshotlisters.VolumeGroupSnapshotLister
|
||||
groupSnapshotListerSynced cache.InformerSynced
|
||||
groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister
|
||||
groupSnapshotContentListerSynced cache.InformerSynced
|
||||
groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister
|
||||
groupSnapshotClassListerSynced cache.InformerSynced
|
||||
|
||||
snapshotStore cache.Store
|
||||
contentStore cache.Store
|
||||
snapshotStore cache.Store
|
||||
contentStore cache.Store
|
||||
groupSnapshotStore cache.Store
|
||||
groupSnapshotContentStore cache.Store
|
||||
|
||||
metricsManager metrics.MetricsManager
|
||||
|
||||
@@ -69,23 +82,30 @@ type csiSnapshotCommonController struct {
|
||||
|
||||
enableDistributedSnapshotting bool
|
||||
preventVolumeModeConversion bool
|
||||
enableVolumeGroupSnapshots bool
|
||||
}
|
||||
|
||||
// NewCSISnapshotController returns a new *csiSnapshotCommonController
|
||||
func NewCSISnapshotCommonController(
|
||||
clientset clientset.Interface,
|
||||
client kubernetes.Interface,
|
||||
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
|
||||
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
|
||||
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
|
||||
volumeSnapshotInformer snapshotinformers.VolumeSnapshotInformer,
|
||||
volumeSnapshotContentInformer snapshotinformers.VolumeSnapshotContentInformer,
|
||||
volumeSnapshotClassInformer snapshotinformers.VolumeSnapshotClassInformer,
|
||||
volumeGroupSnapshotInformer groupsnapshotinformers.VolumeGroupSnapshotInformer,
|
||||
volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer,
|
||||
volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
metricsManager metrics.MetricsManager,
|
||||
resyncPeriod time.Duration,
|
||||
snapshotRateLimiter workqueue.RateLimiter,
|
||||
contentRateLimiter workqueue.RateLimiter,
|
||||
groupSnapshotRateLimiter workqueue.RateLimiter,
|
||||
groupSnapshotContentRateLimiter workqueue.RateLimiter,
|
||||
enableDistributedSnapshotting bool,
|
||||
preventVolumeModeConversion bool,
|
||||
enableVolumeGroupSnapshots bool,
|
||||
) *csiSnapshotCommonController {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartLogging(klog.Infof)
|
||||
@@ -142,12 +162,49 @@ func NewCSISnapshotCommonController(
|
||||
|
||||
ctrl.preventVolumeModeConversion = preventVolumeModeConversion
|
||||
|
||||
ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots
|
||||
|
||||
if enableVolumeGroupSnapshots {
|
||||
ctrl.groupSnapshotQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotRateLimiter, "snapshot-controller-group-snapshot")
|
||||
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "snapshot-controller-group-content")
|
||||
|
||||
volumeGroupSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotWork(obj) },
|
||||
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueGroupSnapshotWork(newObj) },
|
||||
DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotWork(obj) },
|
||||
},
|
||||
ctrl.resyncPeriod,
|
||||
)
|
||||
ctrl.groupSnapshotLister = volumeGroupSnapshotInformer.Lister()
|
||||
ctrl.groupSnapshotListerSynced = volumeGroupSnapshotInformer.Informer().HasSynced
|
||||
|
||||
volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) },
|
||||
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueGroupSnapshotContentWork(newObj) },
|
||||
DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) },
|
||||
},
|
||||
ctrl.resyncPeriod,
|
||||
)
|
||||
ctrl.groupSnapshotContentLister = volumeGroupSnapshotContentInformer.Lister()
|
||||
ctrl.groupSnapshotContentListerSynced = volumeGroupSnapshotContentInformer.Informer().HasSynced
|
||||
|
||||
ctrl.groupSnapshotClassLister = volumeGroupSnapshotClassInformer.Lister()
|
||||
ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotClassInformer.Informer().HasSynced
|
||||
|
||||
}
|
||||
|
||||
return ctrl
|
||||
}
|
||||
|
||||
func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer ctrl.snapshotQueue.ShutDown()
|
||||
defer ctrl.contentQueue.ShutDown()
|
||||
if ctrl.enableVolumeGroupSnapshots {
|
||||
defer ctrl.groupSnapshotQueue.ShutDown()
|
||||
defer ctrl.groupSnapshotContentQueue.ShutDown()
|
||||
}
|
||||
|
||||
klog.Infof("Starting snapshot controller")
|
||||
defer klog.Infof("Shutting snapshot controller")
|
||||
@@ -156,17 +213,24 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
|
||||
if ctrl.enableDistributedSnapshotting {
|
||||
informersSynced = append(informersSynced, ctrl.nodeListerSynced)
|
||||
}
|
||||
if ctrl.enableVolumeGroupSnapshots {
|
||||
informersSynced = append(informersSynced, []cache.InformerSynced{ctrl.groupSnapshotListerSynced, ctrl.groupSnapshotContentListerSynced, ctrl.groupSnapshotClassListerSynced}...)
|
||||
}
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, informersSynced...) {
|
||||
klog.Errorf("Cannot sync caches")
|
||||
return
|
||||
}
|
||||
|
||||
ctrl.initializeCaches(ctrl.snapshotLister, ctrl.contentLister)
|
||||
ctrl.initializeCaches()
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(ctrl.snapshotWorker, 0, stopCh)
|
||||
go wait.Until(ctrl.contentWorker, 0, stopCh)
|
||||
if ctrl.enableVolumeGroupSnapshots {
|
||||
go wait.Until(ctrl.groupSnapshotWorker, 0, stopCh)
|
||||
go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
@@ -481,8 +545,8 @@ func (ctrl *csiSnapshotCommonController) deleteContent(content *crdv1.VolumeSnap
|
||||
// initializeCaches fills all controller caches with initial data from etcd in
|
||||
// order to have the caches already filled when first addSnapshot/addContent to
|
||||
// perform initial synchronization of the controller.
|
||||
func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storagelisters.VolumeSnapshotLister, contentLister storagelisters.VolumeSnapshotContentLister) {
|
||||
snapshotList, err := snapshotLister.List(labels.Everything())
|
||||
func (ctrl *csiSnapshotCommonController) initializeCaches() {
|
||||
snapshotList, err := ctrl.snapshotLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
|
||||
return
|
||||
@@ -494,7 +558,7 @@ func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storage
|
||||
}
|
||||
}
|
||||
|
||||
contentList, err := contentLister.List(labels.Everything())
|
||||
contentList, err := ctrl.contentLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
|
||||
return
|
||||
@@ -506,5 +570,195 @@ func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storage
|
||||
}
|
||||
}
|
||||
|
||||
if ctrl.enableVolumeGroupSnapshots {
|
||||
groupSnapshotList, err := ctrl.snapshotLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
|
||||
return
|
||||
}
|
||||
for _, groupSnapshot := range groupSnapshotList {
|
||||
groupSnapshotClone := groupSnapshot.DeepCopy()
|
||||
if _, err = ctrl.storeGroupSnapshotUpdate(groupSnapshotClone); err != nil {
|
||||
klog.Errorf("error updating volume group snapshot cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
groupContentList, err := ctrl.contentLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
|
||||
return
|
||||
}
|
||||
for _, groupContent := range groupContentList {
|
||||
groupContentClone := groupContent.DeepCopy()
|
||||
if _, err = ctrl.storeGroupSnapshotContentUpdate(groupContentClone); err != nil {
|
||||
klog.Errorf("error updating volume group snapshot content cache: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(4).Infof("controller initialized")
|
||||
}
|
||||
|
||||
// enqueueGroupSnapshotWork adds group snapshot to given work queue.
|
||||
func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotWork(obj interface{}) {
|
||||
// Beware of "xxx deleted" events
|
||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
||||
obj = unknown.Obj
|
||||
}
|
||||
if groupSnapshot, ok := obj.(*crdv1alpha1.VolumeGroupSnapshot); ok {
|
||||
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(groupSnapshot)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get key from object: %v, %v", err, groupSnapshot)
|
||||
return
|
||||
}
|
||||
klog.V(5).Infof("enqueued %q for sync", objName)
|
||||
ctrl.groupSnapshotQueue.Add(objName)
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueGroupSnapshotContentWork adds group snapshot content to given work queue.
|
||||
func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj interface{}) {
|
||||
// Beware of "xxx deleted" events
|
||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
||||
obj = unknown.Obj
|
||||
}
|
||||
if content, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent); ok {
|
||||
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(content)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get key from object: %v, %v", err, content)
|
||||
return
|
||||
}
|
||||
klog.V(5).Infof("enqueued %q for sync", objName)
|
||||
ctrl.groupSnapshotContentQueue.Add(objName)
|
||||
}
|
||||
}
|
||||
|
||||
// groupSnapshotWorker is the main worker for VolumeGroupSnapshots.
|
||||
func (ctrl *csiSnapshotCommonController) groupSnapshotWorker() {
|
||||
keyObj, quit := ctrl.groupSnapshotQueue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer ctrl.groupSnapshotQueue.Done(keyObj)
|
||||
|
||||
if err := ctrl.syncGroupSnapshotByKey(keyObj.(string)); err != nil {
|
||||
// Rather than wait for a full resync, re-add the key to the
|
||||
// queue to be processed.
|
||||
ctrl.groupSnapshotQueue.AddRateLimited(keyObj)
|
||||
klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", keyObj.(string), err)
|
||||
} else {
|
||||
// Finally, if no error occurs we forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
ctrl.groupSnapshotQueue.Forget(keyObj)
|
||||
}
|
||||
}
|
||||
|
||||
// groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent.
|
||||
func (ctrl *csiSnapshotCommonController) groupSnapshotContentWorker() {
|
||||
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
|
||||
|
||||
if err := ctrl.syncContentByKey(keyObj.(string)); err != nil {
|
||||
// Rather than wait for a full resync, re-add the key to the
|
||||
// queue to be processed.
|
||||
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
|
||||
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
|
||||
} else {
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
ctrl.groupSnapshotContentQueue.Forget(keyObj)
|
||||
}
|
||||
}
|
||||
|
||||
// syncGroupSnapshotByKey processes a VolumeGroupSnapshot request.
|
||||
func (ctrl *csiSnapshotCommonController) syncGroupSnapshotByKey(key string) error {
|
||||
klog.V(5).Infof("syncGroupSnapshotByKey[%s]", key)
|
||||
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
klog.V(5).Infof("groupSnapshotWorker: group snapshot namespace [%s] name [%s]", namespace, name)
|
||||
if err != nil {
|
||||
klog.Errorf("error getting namespace & name of group snapshot %q to get group snapshot from informer: %v", key, err)
|
||||
return nil
|
||||
}
|
||||
groupSnapshot, err := ctrl.groupSnapshotLister.VolumeGroupSnapshots(namespace).Get(name)
|
||||
if err == nil {
|
||||
// The volume group snapshot still exists in informer cache, the event must have
|
||||
// been add/update/sync
|
||||
newGroupSnapshot, err := ctrl.checkAndUpdateGroupSnapshotClass(groupSnapshot)
|
||||
if err == nil || (newGroupSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err)) {
|
||||
// If the VolumeSnapshotClass is not found, we still need to process an update
|
||||
// so that syncGroupSnapshot can delete the snapshot, should it still exist in the
|
||||
// cluster after it's been removed from the informer cache
|
||||
if newGroupSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err) {
|
||||
klog.V(5).Infof("GroupSnapshot %q is being deleted. GroupSnapshotClass has already been removed", key)
|
||||
}
|
||||
klog.V(5).Infof("Updating group snapshot %q", key)
|
||||
return ctrl.updateGroupSnapshot(newGroupSnapshot)
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
klog.V(2).Infof("error getting group snapshot %q from informer: %v", key, err)
|
||||
return err
|
||||
}
|
||||
// The group snapshot is not in informer cache, the event must have been "delete"
|
||||
vgsObj, found, err := ctrl.groupSnapshotStore.GetByKey(key)
|
||||
if err != nil {
|
||||
klog.V(2).Infof("error getting group snapshot %q from cache: %v", key, err)
|
||||
return nil
|
||||
}
|
||||
if !found {
|
||||
// The controller has already processed the delete event and
|
||||
// deleted the group snapshot from its cache
|
||||
klog.V(2).Infof("deletion of group snapshot %q was already processed", key)
|
||||
return nil
|
||||
}
|
||||
groupSnapshot, ok := vgsObj.(*crdv1alpha1.VolumeGroupSnapshot)
|
||||
if !ok {
|
||||
klog.Errorf("expected vgs, got %+v", vgsObj)
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.V(5).Infof("deleting group snapshot %q", key)
|
||||
ctrl.deleteGroupSnapshot(groupSnapshot)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkAndUpdateGroupSnapshotClass gets the VolumeGroupSnapshotClass from VolumeGroupSnapshot.
|
||||
// If it is not set, gets it from default VolumeGroupSnapshotClass and sets it.
|
||||
// On error, it must return the original group snapshot, not nil, because the caller
|
||||
// syncGroupSnapshotByKey needs to check group snapshot's timestamp.
|
||||
func (ctrl *csiSnapshotCommonController) checkAndUpdateGroupSnapshotClass(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshot, error) {
|
||||
className := groupSnapshot.Spec.VolumeGroupSnapshotClassName
|
||||
var class *crdv1alpha1.VolumeGroupSnapshotClass
|
||||
var err error
|
||||
newGroupSnapshot := groupSnapshot
|
||||
if className != nil {
|
||||
klog.V(5).Infof("checkAndUpdateGroupSnapshotClass [%s]: VolumeGroupSnapshotClassName [%s]", groupSnapshot.Name, *className)
|
||||
class, err = ctrl.getGroupSnapshotClass(*className)
|
||||
if err != nil {
|
||||
klog.Errorf("checkAndUpdateGroupSnapshotClass failed to getGroupSnapshotClass %v", err)
|
||||
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "GetGroupSnapshotClassFailed", fmt.Sprintf("failed to get group snapshot class with error %v", err))
|
||||
// we need to return the original group snapshot even if the class isn't found, as it may need to be deleted
|
||||
return newGroupSnapshot, err
|
||||
}
|
||||
} else {
|
||||
klog.V(5).Infof("checkAndUpdateGroupSnapshotClass [%s]: SetDefaultGroupSnapshotClass", groupSnapshot.Name)
|
||||
class, newGroupSnapshot, err = ctrl.SetDefaultGroupSnapshotClass(groupSnapshot)
|
||||
if err != nil {
|
||||
klog.Errorf("checkAndUpdateGroupSnapshotClass failed to setDefaultClass %v", err)
|
||||
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "SetDefaultGroupSnapshotClassFailed", fmt.Sprintf("Failed to set default group snapshot class with error %v", err))
|
||||
return groupSnapshot, err
|
||||
}
|
||||
}
|
||||
|
||||
// For pre-provisioned group snapshots, we may not have group snapshot class
|
||||
if class != nil {
|
||||
klog.V(5).Infof("VolumeGroupSnapshotClass [%s] Driver [%s]", class.Name, class.Driver)
|
||||
}
|
||||
return newGroupSnapshot, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user