Merge pull request #75 from wackxu/pvclister

add pvcLister to snapshot controller
This commit is contained in:
Kubernetes Prow Robot
2019-01-02 07:17:24 -08:00
committed by GitHub
5 changed files with 29 additions and 7 deletions

View File

@@ -37,6 +37,7 @@ import (
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme" snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme"
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions" informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
coreinformers "k8s.io/client-go/informers"
) )
const ( const (
@@ -95,6 +96,7 @@ func main() {
} }
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod) factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
// Create CRD resource // Create CRD resource
aeclientset, err := apiextensionsclient.NewForConfig(config) aeclientset, err := apiextensionsclient.NewForConfig(config)
@@ -165,6 +167,7 @@ func main() {
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(), factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
*createSnapshotContentRetryCount, *createSnapshotContentRetryCount,
*createSnapshotContentInterval, *createSnapshotContentInterval,
csiConn, csiConn,
@@ -177,6 +180,7 @@ func main() {
// run... // run...
stopCh := make(chan struct{}) stopCh := make(chan struct{})
factory.Start(stopCh) factory.Start(stopCh)
coreFactory.Start(stopCh)
go ctrl.Run(threads, stopCh) go ctrl.Run(threads, stopCh)
// ...until SIGINT // ...until SIGINT

View File

@@ -46,9 +46,11 @@ import (
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
kubefake "k8s.io/client-go/kubernetes/fake" kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
corelisters "k8s.io/client-go/listers/core/v1"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@@ -720,6 +722,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory = informers.NewSharedInformerFactory(clientset, NoResyncPeriodFunc()) informerFactory = informers.NewSharedInformerFactory(clientset, NoResyncPeriodFunc())
} }
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, NoResyncPeriodFunc())
// Construct controller // Construct controller
csiConnection := &fakeCSIConnection{ csiConnection := &fakeCSIConnection{
t: t, t: t,
@@ -735,6 +739,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshots(), informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(), informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(), informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
3, 3,
5*time.Millisecond, 5*time.Millisecond,
csiConnection, csiConnection,
@@ -1052,9 +1057,14 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1
reactor.contents[content.Name] = content reactor.contents[content.Name] = content
} }
} }
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, claim := range test.initialClaims { for _, claim := range test.initialClaims {
reactor.claims[claim.Name] = claim reactor.claims[claim.Name] = claim
pvcIndexer.Add(claim)
} }
ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
for _, volume := range test.initialVolumes { for _, volume := range test.initialVolumes {
reactor.volumes[volume.Name] = volume reactor.volumes[volume.Name] = volume
} }

View File

@@ -470,12 +470,12 @@ func (ctrl *csiSnapshotController) isSnapshotContentBeingUsed(content *crdv1.Vol
// isVolumeBeingCreatedFromSnapshot checks if an volume is being created from the snapshot. // isVolumeBeingCreatedFromSnapshot checks if an volume is being created from the snapshot.
func (ctrl *csiSnapshotController) isVolumeBeingCreatedFromSnapshot(snapshot *crdv1.VolumeSnapshot) bool { func (ctrl *csiSnapshotController) isVolumeBeingCreatedFromSnapshot(snapshot *crdv1.VolumeSnapshot) bool {
pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).List(metav1.ListOptions{}) pvcList, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).List(labels.Everything())
if err != nil { if err != nil {
glog.Errorf("Failed to retrieve PVCs from the API server to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err) glog.Errorf("Failed to retrieve PVCs from the lister to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err)
return false return false
} }
for _, pvc := range pvcList.Items { for _, pvc := range pvcList {
if pvc.Spec.DataSource != nil && len(pvc.Spec.DataSource.Name) > 0 && pvc.Spec.DataSource.Name == snapshot.Name { if pvc.Spec.DataSource != nil && len(pvc.Spec.DataSource.Name) > 0 && pvc.Spec.DataSource.Name == snapshot.Name {
if pvc.Spec.DataSource.Kind == snapshotKind && *(pvc.Spec.DataSource.APIGroup) == snapshotAPIGroup { if pvc.Spec.DataSource.Kind == snapshotKind && *(pvc.Spec.DataSource.APIGroup) == snapshotAPIGroup {
if pvc.Status.Phase == v1.ClaimPending { if pvc.Status.Phase == v1.ClaimPending {
@@ -947,9 +947,9 @@ func (ctrl *csiSnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.Vo
return nil, fmt.Errorf("the snapshot source does not have the right APIGroup. Expected empty string, Got %s", *(snapshot.Spec.Source.APIGroup)) return nil, fmt.Errorf("the snapshot source does not have the right APIGroup. Expected empty string, Got %s", *(snapshot.Spec.Source.APIGroup))
} }
pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{}) pvc, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).Get(pvcName)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err) return nil, fmt.Errorf("failed to retrieve PVC %s from the lister: %q", pvcName, err)
} }
return pvc, nil return pvc, nil

View File

@@ -30,9 +30,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@@ -53,6 +55,8 @@ type csiSnapshotController struct {
contentListerSynced cache.InformerSynced contentListerSynced cache.InformerSynced
classLister storagelisters.VolumeSnapshotClassLister classLister storagelisters.VolumeSnapshotClassLister
classListerSynced cache.InformerSynced classListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
snapshotStore cache.Store snapshotStore cache.Store
contentStore cache.Store contentStore cache.Store
@@ -74,6 +78,7 @@ func NewCSISnapshotController(
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer, volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer, volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer, volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
createSnapshotContentRetryCount int, createSnapshotContentRetryCount int,
createSnapshotContentInterval time.Duration, createSnapshotContentInterval time.Duration,
conn connection.CSIConnection, conn connection.CSIConnection,
@@ -104,6 +109,9 @@ func NewCSISnapshotController(
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"), contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
} }
ctrl.pvcLister = pvcInformer.Lister()
ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod( volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) }, AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
@@ -139,7 +147,7 @@ func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting CSI snapshotter") glog.Infof("Starting CSI snapshotter")
defer glog.Infof("Shutting CSI snapshotter") defer glog.Infof("Shutting CSI snapshotter")
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced) { if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) {
glog.Errorf("Cannot sync caches") glog.Errorf("Cannot sync caches")
return return
} }

View File

@@ -255,7 +255,7 @@ func TestCreateSnapshotSync(t *testing.T) {
initialContents: nocontents, initialContents: nocontents,
expectedContents: nocontents, expectedContents: nocontents,
initialSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, nil, nil, nil), initialSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, nil, nil, nil),
expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the API server: \\\"cannot find claim claim7-4\\\"\""), nil, nil), expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the lister: \\\"persistentvolumeclaim \\\\\\\"claim7-4\\\\\\\" not found\\\"\""), nil, nil),
initialVolumes: newVolumeArray("volume7-4", "pv-uid7-4", "pv-handle7-4", "1Gi", "pvc-uid7-4", "claim7-4", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty), initialVolumes: newVolumeArray("volume7-4", "pv-uid7-4", "pv-handle7-4", "1Gi", "pvc-uid7-4", "claim7-4", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty),
expectedEvents: []string{"Warning SnapshotCreationFailed"}, expectedEvents: []string{"Warning SnapshotCreationFailed"},
errors: noerrors, errors: noerrors,