diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index e4497710..22d60892 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -37,6 +37,7 @@ import ( snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme" informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + coreinformers "k8s.io/client-go/informers" ) const ( @@ -95,6 +96,7 @@ func main() { } factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod) + coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod) // Create CRD resource aeclientset, err := apiextensionsclient.NewForConfig(config) @@ -165,6 +167,7 @@ func main() { factory.Volumesnapshot().V1alpha1().VolumeSnapshots(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(), + coreFactory.Core().V1().PersistentVolumeClaims(), *createSnapshotContentRetryCount, *createSnapshotContentInterval, csiConn, @@ -177,6 +180,7 @@ func main() { // run... stopCh := make(chan struct{}) factory.Start(stopCh) + coreFactory.Start(stopCh) go ctrl.Run(threads, stopCh) // ...until SIGINT diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index 9a57ca4b..5e5959df 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -46,9 +46,11 @@ import ( "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + coreinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + corelisters "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -720,6 +722,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte informerFactory = informers.NewSharedInformerFactory(clientset, NoResyncPeriodFunc()) } + coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, NoResyncPeriodFunc()) + // Construct controller csiConnection := &fakeCSIConnection{ t: t, @@ -735,6 +739,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshots(), informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(), informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(), + coreFactory.Core().V1().PersistentVolumeClaims(), 3, 5*time.Millisecond, csiConnection, @@ -1052,9 +1057,14 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1 reactor.contents[content.Name] = content } } + + pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) for _, claim := range test.initialClaims { reactor.claims[claim.Name] = claim + pvcIndexer.Add(claim) } + ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer) + for _, volume := range test.initialVolumes { reactor.volumes[volume.Name] = volume } diff --git a/pkg/controller/snapshot_controller.go b/pkg/controller/snapshot_controller.go index 22a57cd5..d462a555 100644 --- a/pkg/controller/snapshot_controller.go +++ b/pkg/controller/snapshot_controller.go @@ -470,12 +470,12 @@ func (ctrl *csiSnapshotController) isSnapshotContentBeingUsed(content *crdv1.Vol // isVolumeBeingCreatedFromSnapshot checks if an volume is being created from the snapshot. func (ctrl *csiSnapshotController) isVolumeBeingCreatedFromSnapshot(snapshot *crdv1.VolumeSnapshot) bool { - pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).List(metav1.ListOptions{}) + pvcList, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).List(labels.Everything()) if err != nil { - glog.Errorf("Failed to retrieve PVCs from the API server to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err) + glog.Errorf("Failed to retrieve PVCs from the lister to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err) return false } - for _, pvc := range pvcList.Items { + for _, pvc := range pvcList { if pvc.Spec.DataSource != nil && len(pvc.Spec.DataSource.Name) > 0 && pvc.Spec.DataSource.Name == snapshot.Name { if pvc.Spec.DataSource.Kind == snapshotKind && *(pvc.Spec.DataSource.APIGroup) == snapshotAPIGroup { if pvc.Status.Phase == v1.ClaimPending { @@ -947,9 +947,9 @@ func (ctrl *csiSnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.Vo return nil, fmt.Errorf("the snapshot source does not have the right APIGroup. Expected empty string, Got %s", *(snapshot.Spec.Source.APIGroup)) } - pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{}) + pvc, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).Get(pvcName) if err != nil { - return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err) + return nil, fmt.Errorf("failed to retrieve PVC %s from the lister: %q", pvcName, err) } return pvc, nil diff --git a/pkg/controller/snapshot_controller_base.go b/pkg/controller/snapshot_controller_base.go index ab171b2b..81dbeec4 100644 --- a/pkg/controller/snapshot_controller_base.go +++ b/pkg/controller/snapshot_controller_base.go @@ -30,9 +30,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -53,6 +55,8 @@ type csiSnapshotController struct { contentListerSynced cache.InformerSynced classLister storagelisters.VolumeSnapshotClassLister classListerSynced cache.InformerSynced + pvcLister corelisters.PersistentVolumeClaimLister + pvcListerSynced cache.InformerSynced snapshotStore cache.Store contentStore cache.Store @@ -74,6 +78,7 @@ func NewCSISnapshotController( volumeSnapshotInformer storageinformers.VolumeSnapshotInformer, volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer, volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer, createSnapshotContentRetryCount int, createSnapshotContentInterval time.Duration, conn connection.CSIConnection, @@ -104,6 +109,9 @@ func NewCSISnapshotController( contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"), } + ctrl.pvcLister = pvcInformer.Lister() + ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced + volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) }, @@ -139,7 +147,7 @@ func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting CSI snapshotter") defer glog.Infof("Shutting CSI snapshotter") - if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced) { + if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) { glog.Errorf("Cannot sync caches") return } diff --git a/pkg/controller/snapshot_create_test.go b/pkg/controller/snapshot_create_test.go index 7ae31041..bfb925a6 100644 --- a/pkg/controller/snapshot_create_test.go +++ b/pkg/controller/snapshot_create_test.go @@ -255,7 +255,7 @@ func TestCreateSnapshotSync(t *testing.T) { initialContents: nocontents, expectedContents: nocontents, initialSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, nil, nil, nil), - expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the API server: \\\"cannot find claim claim7-4\\\"\""), nil, nil), + expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the lister: \\\"persistentvolumeclaim \\\\\\\"claim7-4\\\\\\\" not found\\\"\""), nil, nil), initialVolumes: newVolumeArray("volume7-4", "pv-uid7-4", "pv-handle7-4", "1Gi", "pvc-uid7-4", "claim7-4", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty), expectedEvents: []string{"Warning SnapshotCreationFailed"}, errors: noerrors,