diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index e4497710..99f1fa51 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -37,6 +37,7 @@ import ( snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme" informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + coreinformers "k8s.io/client-go/informers" ) const ( @@ -95,6 +96,7 @@ func main() { } factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod) + coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod) // Create CRD resource aeclientset, err := apiextensionsclient.NewForConfig(config) @@ -165,6 +167,7 @@ func main() { factory.Volumesnapshot().V1alpha1().VolumeSnapshots(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(), + coreFactory.Core().V1().PersistentVolumeClaims(), *createSnapshotContentRetryCount, *createSnapshotContentInterval, csiConn, diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index 6ad80058..39115cd6 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -46,9 +46,11 @@ import ( "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + coreinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + corelisters "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -710,6 +712,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte informerFactory = informers.NewSharedInformerFactory(clientset, NoResyncPeriodFunc()) } + coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, NoResyncPeriodFunc()) + // Construct controller csiConnection := &fakeCSIConnection{ t: t, @@ -725,6 +729,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshots(), informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(), informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(), + coreFactory.Core().V1().PersistentVolumeClaims(), 3, 5*time.Millisecond, csiConnection, @@ -1039,9 +1044,14 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1 reactor.contents[content.Name] = content } } + + pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) for _, claim := range test.initialClaims { reactor.claims[claim.Name] = claim + pvcIndexer.Add(claim) } + ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer) + for _, volume := range test.initialVolumes { reactor.volumes[volume.Name] = volume } diff --git a/pkg/controller/snapshot_controller.go b/pkg/controller/snapshot_controller.go index 89d9e302..8bb60dd5 100644 --- a/pkg/controller/snapshot_controller.go +++ b/pkg/controller/snapshot_controller.go @@ -867,9 +867,9 @@ func (ctrl *csiSnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.Vo return nil, fmt.Errorf("the snapshot source does not have the right APIGroup. Expected empty string, Got %s", *(snapshot.Spec.Source.APIGroup)) } - pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{}) + pvc, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).Get(pvcName) if err != nil { - return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err) + return nil, fmt.Errorf("failed to retrieve PVC %s from the lister: %q", pvcName, err) } return pvc, nil diff --git a/pkg/controller/snapshot_controller_base.go b/pkg/controller/snapshot_controller_base.go index ab171b2b..81dbeec4 100644 --- a/pkg/controller/snapshot_controller_base.go +++ b/pkg/controller/snapshot_controller_base.go @@ -30,9 +30,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -53,6 +55,8 @@ type csiSnapshotController struct { contentListerSynced cache.InformerSynced classLister storagelisters.VolumeSnapshotClassLister classListerSynced cache.InformerSynced + pvcLister corelisters.PersistentVolumeClaimLister + pvcListerSynced cache.InformerSynced snapshotStore cache.Store contentStore cache.Store @@ -74,6 +78,7 @@ func NewCSISnapshotController( volumeSnapshotInformer storageinformers.VolumeSnapshotInformer, volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer, volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer, createSnapshotContentRetryCount int, createSnapshotContentInterval time.Duration, conn connection.CSIConnection, @@ -104,6 +109,9 @@ func NewCSISnapshotController( contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"), } + ctrl.pvcLister = pvcInformer.Lister() + ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced + volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) }, @@ -139,7 +147,7 @@ func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting CSI snapshotter") defer glog.Infof("Shutting CSI snapshotter") - if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced) { + if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) { glog.Errorf("Cannot sync caches") return } diff --git a/pkg/controller/snapshot_create_test.go b/pkg/controller/snapshot_create_test.go index 08d39205..7b10ec72 100644 --- a/pkg/controller/snapshot_create_test.go +++ b/pkg/controller/snapshot_create_test.go @@ -255,7 +255,7 @@ func TestCreateSnapshotSync(t *testing.T) { initialContents: nocontents, expectedContents: nocontents, initialSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, nil, nil, nil), - expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the API server: \\\"cannot find claim claim7-4\\\"\""), nil, nil), + expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the lister: \\\"persistentvolumeclaim \\\\\\\"claim7-4\\\\\\\" not found\\\"\""), nil, nil), initialVolumes: newVolumeArray("volume7-4", "pv-uid7-4", "pv-handle7-4", "1Gi", "pvc-uid7-4", "claim7-4", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty), expectedEvents: []string{"Warning SnapshotCreationFailed"}, errors: noerrors,