Merge pull request #72 from wackxu/pvclister
add pvcLister to snapshot controller
This commit is contained in:
@@ -37,6 +37,7 @@ import (
|
||||
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme"
|
||||
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
|
||||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
coreinformers "k8s.io/client-go/informers"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -95,6 +96,7 @@ func main() {
|
||||
}
|
||||
|
||||
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
|
||||
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
|
||||
|
||||
// Create CRD resource
|
||||
aeclientset, err := apiextensionsclient.NewForConfig(config)
|
||||
@@ -165,6 +167,7 @@ func main() {
|
||||
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
|
||||
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
|
||||
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
|
||||
coreFactory.Core().V1().PersistentVolumeClaims(),
|
||||
*createSnapshotContentRetryCount,
|
||||
*createSnapshotContentInterval,
|
||||
csiConn,
|
||||
|
@@ -46,9 +46,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
coreinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
@@ -720,6 +722,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
|
||||
informerFactory = informers.NewSharedInformerFactory(clientset, NoResyncPeriodFunc())
|
||||
}
|
||||
|
||||
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, NoResyncPeriodFunc())
|
||||
|
||||
// Construct controller
|
||||
csiConnection := &fakeCSIConnection{
|
||||
t: t,
|
||||
@@ -735,6 +739,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
|
||||
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
|
||||
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
|
||||
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
|
||||
coreFactory.Core().V1().PersistentVolumeClaims(),
|
||||
3,
|
||||
5*time.Millisecond,
|
||||
csiConnection,
|
||||
@@ -1052,9 +1057,14 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1
|
||||
reactor.contents[content.Name] = content
|
||||
}
|
||||
}
|
||||
|
||||
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
||||
for _, claim := range test.initialClaims {
|
||||
reactor.claims[claim.Name] = claim
|
||||
pvcIndexer.Add(claim)
|
||||
}
|
||||
ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
|
||||
|
||||
for _, volume := range test.initialVolumes {
|
||||
reactor.volumes[volume.Name] = volume
|
||||
}
|
||||
|
@@ -947,9 +947,9 @@ func (ctrl *csiSnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.Vo
|
||||
return nil, fmt.Errorf("the snapshot source does not have the right APIGroup. Expected empty string, Got %s", *(snapshot.Spec.Source.APIGroup))
|
||||
}
|
||||
|
||||
pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{})
|
||||
pvc, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).Get(pvcName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err)
|
||||
return nil, fmt.Errorf("failed to retrieve PVC %s from the lister: %q", pvcName, err)
|
||||
}
|
||||
|
||||
return pvc, nil
|
||||
|
@@ -30,9 +30,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@@ -53,6 +55,8 @@ type csiSnapshotController struct {
|
||||
contentListerSynced cache.InformerSynced
|
||||
classLister storagelisters.VolumeSnapshotClassLister
|
||||
classListerSynced cache.InformerSynced
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvcListerSynced cache.InformerSynced
|
||||
|
||||
snapshotStore cache.Store
|
||||
contentStore cache.Store
|
||||
@@ -74,6 +78,7 @@ func NewCSISnapshotController(
|
||||
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
|
||||
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
|
||||
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
createSnapshotContentRetryCount int,
|
||||
createSnapshotContentInterval time.Duration,
|
||||
conn connection.CSIConnection,
|
||||
@@ -104,6 +109,9 @@ func NewCSISnapshotController(
|
||||
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
|
||||
}
|
||||
|
||||
ctrl.pvcLister = pvcInformer.Lister()
|
||||
ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced
|
||||
|
||||
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
|
||||
@@ -139,7 +147,7 @@ func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) {
|
||||
glog.Infof("Starting CSI snapshotter")
|
||||
defer glog.Infof("Shutting CSI snapshotter")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced) {
|
||||
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) {
|
||||
glog.Errorf("Cannot sync caches")
|
||||
return
|
||||
}
|
||||
|
@@ -255,7 +255,7 @@ func TestCreateSnapshotSync(t *testing.T) {
|
||||
initialContents: nocontents,
|
||||
expectedContents: nocontents,
|
||||
initialSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, nil, nil, nil),
|
||||
expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the API server: \\\"cannot find claim claim7-4\\\"\""), nil, nil),
|
||||
expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the lister: \\\"persistentvolumeclaim \\\\\\\"claim7-4\\\\\\\" not found\\\"\""), nil, nil),
|
||||
initialVolumes: newVolumeArray("volume7-4", "pv-uid7-4", "pv-handle7-4", "1Gi", "pvc-uid7-4", "claim7-4", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty),
|
||||
expectedEvents: []string{"Warning SnapshotCreationFailed"},
|
||||
errors: noerrors,
|
||||
|
Reference in New Issue
Block a user