add pvcLister to snapshot controller

This reverts commit 96b6f495d3.
This commit is contained in:
xushiwei
2018-11-30 17:09:46 +08:00
committed by wackxu
parent 7cd3ef6f55
commit 85504e1220
5 changed files with 29 additions and 7 deletions

View File

@@ -37,6 +37,7 @@ import (
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme"
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
coreinformers "k8s.io/client-go/informers"
)
const (
@@ -95,6 +96,7 @@ func main() {
}
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
// Create CRD resource
aeclientset, err := apiextensionsclient.NewForConfig(config)
@@ -165,6 +167,7 @@ func main() {
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
*createSnapshotContentRetryCount,
*createSnapshotContentInterval,
csiConn,
@@ -177,6 +180,7 @@ func main() {
// run...
stopCh := make(chan struct{})
factory.Start(stopCh)
coreFactory.Start(stopCh)
go ctrl.Run(threads, stopCh)
// ...until SIGINT

View File

@@ -46,9 +46,11 @@ import (
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
corelisters "k8s.io/client-go/listers/core/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@@ -720,6 +722,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory = informers.NewSharedInformerFactory(clientset, NoResyncPeriodFunc())
}
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, NoResyncPeriodFunc())
// Construct controller
csiConnection := &fakeCSIConnection{
t: t,
@@ -735,6 +739,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
3,
5*time.Millisecond,
csiConnection,
@@ -1052,9 +1057,14 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1
reactor.contents[content.Name] = content
}
}
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, claim := range test.initialClaims {
reactor.claims[claim.Name] = claim
pvcIndexer.Add(claim)
}
ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
for _, volume := range test.initialVolumes {
reactor.volumes[volume.Name] = volume
}

View File

@@ -470,12 +470,12 @@ func (ctrl *csiSnapshotController) isSnapshotContentBeingUsed(content *crdv1.Vol
// isVolumeBeingCreatedFromSnapshot checks if an volume is being created from the snapshot.
func (ctrl *csiSnapshotController) isVolumeBeingCreatedFromSnapshot(snapshot *crdv1.VolumeSnapshot) bool {
pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).List(metav1.ListOptions{})
pvcList, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Failed to retrieve PVCs from the API server to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err)
glog.Errorf("Failed to retrieve PVCs from the lister to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err)
return false
}
for _, pvc := range pvcList.Items {
for _, pvc := range pvcList {
if pvc.Spec.DataSource != nil && len(pvc.Spec.DataSource.Name) > 0 && pvc.Spec.DataSource.Name == snapshot.Name {
if pvc.Spec.DataSource.Kind == snapshotKind && *(pvc.Spec.DataSource.APIGroup) == snapshotAPIGroup {
if pvc.Status.Phase == v1.ClaimPending {
@@ -947,9 +947,9 @@ func (ctrl *csiSnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.Vo
return nil, fmt.Errorf("the snapshot source does not have the right APIGroup. Expected empty string, Got %s", *(snapshot.Spec.Source.APIGroup))
}
pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{})
pvc, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).Get(pvcName)
if err != nil {
return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err)
return nil, fmt.Errorf("failed to retrieve PVC %s from the lister: %q", pvcName, err)
}
return pvc, nil

View File

@@ -30,9 +30,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
@@ -53,6 +55,8 @@ type csiSnapshotController struct {
contentListerSynced cache.InformerSynced
classLister storagelisters.VolumeSnapshotClassLister
classListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
snapshotStore cache.Store
contentStore cache.Store
@@ -74,6 +78,7 @@ func NewCSISnapshotController(
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
createSnapshotContentRetryCount int,
createSnapshotContentInterval time.Duration,
conn connection.CSIConnection,
@@ -104,6 +109,9 @@ func NewCSISnapshotController(
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}
ctrl.pvcLister = pvcInformer.Lister()
ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
@@ -139,7 +147,7 @@ func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting CSI snapshotter")
defer glog.Infof("Shutting CSI snapshotter")
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced) {
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) {
glog.Errorf("Cannot sync caches")
return
}

View File

@@ -255,7 +255,7 @@ func TestCreateSnapshotSync(t *testing.T) {
initialContents: nocontents,
expectedContents: nocontents,
initialSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, nil, nil, nil),
expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the API server: \\\"cannot find claim claim7-4\\\"\""), nil, nil),
expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the lister: \\\"persistentvolumeclaim \\\\\\\"claim7-4\\\\\\\" not found\\\"\""), nil, nil),
initialVolumes: newVolumeArray("volume7-4", "pv-uid7-4", "pv-handle7-4", "1Gi", "pvc-uid7-4", "claim7-4", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty),
expectedEvents: []string{"Warning SnapshotCreationFailed"},
errors: noerrors,