Add Delete Volume Finalizer

This PR adds Finalizer on the snapshot source PVC to prevent it
from being deleted when a snapshot is being created from it.
This commit is contained in:
Xing Yang
2018-10-17 10:32:40 -07:00
parent ac33959738
commit 603855fc1b
5 changed files with 366 additions and 6 deletions

View File

@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"reflect"
sysruntime "runtime"
"strconv"
"strings"
"sync"
@@ -53,6 +54,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/slice"
)
// This is a unit test framework for snapshot controller.
@@ -110,7 +112,8 @@ type controllerTest struct {
// List of expected CSI list snapshot calls
expectedListCalls []listCall
// Function to call as the test.
test testCall
test testCall
expectSuccess bool
}
type testCall func(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest) error
@@ -177,6 +180,11 @@ func withContentFinalizer(content *crdv1.VolumeSnapshotContent) *crdv1.VolumeSna
return content
}
func withPVCFinalizer(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, PVCFinalizer)
return pvc
}
// React is a callback called by fake kubeClient from the controller.
// In other words, every snapshot/content change performed by the controller ends
// here.
@@ -331,6 +339,32 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, fmt.Errorf("cannot find claim %s", name)
case action.Matches("update", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// Check and bump object version
storedClaim, found := r.claims[claim.Name]
if found {
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
if storedVer != requestedVer {
return true, obj, errVersionConflict
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("cannot update claim %s: claim not found", claim.Name)
}
// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated claim %s", claim.Name)
return true, claim, nil
case action.Matches("get", "storageclasses"):
name := action.(core.GetAction).GetName()
storageClass, found := r.storageClasses[name]
@@ -550,6 +584,9 @@ func (r *snapshotReactor) syncAll() {
for _, v := range r.contents {
r.changedObjects = append(r.changedObjects, v)
}
for _, pvc := range r.claims {
r.changedObjects = append(r.changedObjects, pvc)
}
r.changedSinceLastSync = 0
}
@@ -699,6 +736,7 @@ func newSnapshotReactor(kubeClient *kubefake.Clientset, client *fake.Clientset,
client.AddReactor("delete", "volumesnapshotcontents", reactor.React)
client.AddReactor("delete", "volumesnapshots", reactor.React)
kubeClient.AddReactor("get", "persistentvolumeclaims", reactor.React)
kubeClient.AddReactor("update", "persistentvolumeclaims", reactor.React)
kubeClient.AddReactor("get", "persistentvolumes", reactor.React)
kubeClient.AddReactor("get", "storageclasses", reactor.React)
kubeClient.AddReactor("get", "secrets", reactor.React)
@@ -746,6 +784,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
ctrl.contentListerSynced = alwaysReady
ctrl.snapshotListerSynced = alwaysReady
ctrl.classListerSynced = alwaysReady
ctrl.pvcListerSynced = alwaysReady
return ctrl, nil
}
@@ -845,7 +884,7 @@ func newSnapshotArray(name, className, boundToContent, snapshotUID, claimName st
}
// newClaim returns a new claim with given attributes
func newClaim(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string) *v1.PersistentVolumeClaim {
func newClaim(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string, bFinalizer bool) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@@ -877,6 +916,9 @@ func newClaim(name, claimUID, capacity, boundToVolume string, phase v1.Persisten
claim.Status.Capacity = claim.Spec.Resources.Requests
}
if bFinalizer {
return withPVCFinalizer(&claim)
}
return &claim
}
@@ -884,7 +926,15 @@ func newClaim(name, claimUID, capacity, boundToVolume string, phase v1.Persisten
// newClaim() with the same parameters.
func newClaimArray(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string) []*v1.PersistentVolumeClaim {
return []*v1.PersistentVolumeClaim{
newClaim(name, claimUID, capacity, boundToVolume, phase, class),
newClaim(name, claimUID, capacity, boundToVolume, phase, class, false),
}
}
// newClaimArrayFinalizer returns array with a single claim that would be returned by
// newClaim() with the same parameters plus finalizer.
func newClaimArrayFinalizer(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string) []*v1.PersistentVolumeClaim {
return []*v1.PersistentVolumeClaim{
newClaim(name, claimUID, capacity, boundToVolume, phase, class, true),
}
}
@@ -961,6 +1011,14 @@ func testSyncContent(ctrl *csiSnapshotController, reactor *snapshotReactor, test
return ctrl.syncContent(test.initialContents[0])
}
func testAddPVCFinalizer(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest) error {
return ctrl.ensureSnapshotSourceFinalizer(test.initialSnapshots[0])
}
func testRemovePVCFinalizer(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest) error {
return ctrl.checkandRemoveSnapshotSourceFinalizer(test.initialSnapshots[0])
}
var (
classEmpty string
classGold = "gold"
@@ -1097,6 +1155,113 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1
}
}
// This tests ensureSnapshotSourceFinalizer and checkandRemoveSnapshotSourceFinalizer
func runPVCFinalizerTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1.VolumeSnapshotClass) {
snapshotscheme.AddToScheme(scheme.Scheme)
for _, test := range tests {
klog.V(4).Infof("starting test %q", test.name)
// Initialize the controller
kubeClient := &kubefake.Clientset{}
client := &fake.Clientset{}
ctrl, err := newTestController(kubeClient, client, nil, t, test)
if err != nil {
t.Fatalf("Test %q construct persistent content failed: %v", test.name, err)
}
reactor := newSnapshotReactor(kubeClient, client, ctrl, nil, nil, test.errors)
for _, snapshot := range test.initialSnapshots {
ctrl.snapshotStore.Add(snapshot)
reactor.snapshots[snapshot.Name] = snapshot
}
for _, content := range test.initialContents {
if ctrl.isDriverMatch(test.initialContents[0]) {
ctrl.contentStore.Add(content)
reactor.contents[content.Name] = content
}
}
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, claim := range test.initialClaims {
reactor.claims[claim.Name] = claim
pvcIndexer.Add(claim)
}
ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
for _, volume := range test.initialVolumes {
reactor.volumes[volume.Name] = volume
}
for _, storageClass := range test.initialStorageClasses {
reactor.storageClasses[storageClass.Name] = storageClass
}
for _, secret := range test.initialSecrets {
reactor.secrets[secret.Name] = secret
}
// Inject classes into controller via a custom lister.
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, class := range snapshotClasses {
indexer.Add(class)
}
ctrl.classLister = storagelisters.NewVolumeSnapshotClassLister(indexer)
// Run the tested functions
err = test.test(ctrl, reactor, test)
if err != nil {
t.Errorf("Test %q failed: %v", test.name, err)
}
// Verify PVCFinalizer tests results
evaluatePVCFinalizerTests(ctrl, reactor, test, t)
}
}
// Evaluate PVCFinalizer tests results
func evaluatePVCFinalizerTests(ctrl *csiSnapshotController, reactor *snapshotReactor, test controllerTest, t *testing.T) {
// Evaluate results
bHasPVCFinalizer := false
name := sysruntime.FuncForPC(reflect.ValueOf(test.test).Pointer()).Name()
index := strings.LastIndex(name, ".")
if index == -1 {
t.Errorf("Test %q: failed to test finalizer - invalid test call name [%s]", test.name, name)
return
}
names := []rune(name)
funcName := string(names[index+1 : len(name)])
klog.V(4).Infof("test %q: PVCFinalizer test func name: [%s]", test.name, funcName)
if funcName == "testAddPVCFinalizer" {
for _, pvc := range reactor.claims {
if test.initialClaims[0].Name == pvc.Name {
if !slice.ContainsString(test.initialClaims[0].ObjectMeta.Finalizers, PVCFinalizer, nil) && slice.ContainsString(pvc.ObjectMeta.Finalizers, PVCFinalizer, nil) {
klog.V(4).Infof("test %q succeeded. PVCFinalizer is added to PVC %s", test.name, pvc.Name)
bHasPVCFinalizer = true
}
break
}
}
if test.expectSuccess && !bHasPVCFinalizer {
t.Errorf("Test %q: failed to add finalizer to PVC %s", test.name, test.initialClaims[0].Name)
}
}
bHasPVCFinalizer = true
if funcName == "testRemovePVCFinalizer" {
for _, pvc := range reactor.claims {
if test.initialClaims[0].Name == pvc.Name {
if slice.ContainsString(test.initialClaims[0].ObjectMeta.Finalizers, PVCFinalizer, nil) && !slice.ContainsString(pvc.ObjectMeta.Finalizers, PVCFinalizer, nil) {
klog.V(4).Infof("test %q succeeded. PVCFinalizer is removed from PVC %s", test.name, pvc.Name)
bHasPVCFinalizer = false
}
break
}
}
if test.expectSuccess && bHasPVCFinalizer {
t.Errorf("Test %q: failed to remove finalizer from PVC %s", test.name, test.initialClaims[0].Name)
}
}
}
func getSize(size int64) *resource.Quantity {
return resource.NewQuantity(size, resource.BinarySI)
}