Handle the CSI driver in VolumeSnapshotContent does not match case

In VolumeSnapshotContent, if the CSI driver does not match the plugin's,
the controller should skip this content instead of always processing it.
This PR also add a few tests related to snapshot and content static
binding.

During binding, if content specify its bound snapshot uid and it
does not match the snapshot's uid, the content object and also the
physical snapshot will be deleted. In this case, the controller will
treat the content as an orphan content because its snapshot object does
not exist (deleted) any more.
This commit is contained in:
Jing Xu
2018-09-20 15:21:18 -07:00
parent 5ed076f3db
commit c401b2331c
5 changed files with 151 additions and 59 deletions

View File

@@ -85,49 +85,49 @@ const IsDefaultSnapshotClassAnnotation = "snapshot.storage.kubernetes.io/is-defa
func (ctrl *csiSnapshotController) syncContent(content *crdv1.VolumeSnapshotContent) error {
glog.V(5).Infof("synchronizing VolumeSnapshotContent[%s]", content.Name)
// VolumeSnapshotContent is not bound to any VolumeSnapshot, this case rare and we just return err
// VolumeSnapshotContent is not bound to any VolumeSnapshot, in this case we just return err
if content.Spec.VolumeSnapshotRef == nil {
// content is not bound
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: VolumeSnapshotContent is not bound to any VolumeSnapshot", content.Name)
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotContentNotBound", "VolumeSnapshotContent is not bound to any VolumeSnapshot")
return fmt.Errorf("volumeSnapshotContent %s is not bound to any VolumeSnapshot", content.Name)
} else {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content is bound to snapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// The VolumeSnapshotContent is reserved for a VolumeSnapshot;
// that VolumeSnapshot has not yet been bound to this VolumeSnapshotContent; the VolumeSnapshot sync will handle it.
if content.Spec.VolumeSnapshotRef.UID == "" {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: VolumeSnapshotContent is pre-bound to VolumeSnapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
return nil
}
// Get the VolumeSnapshot by _name_
var snapshot *crdv1.VolumeSnapshot
snapshotName := snapshotRefKey(content.Spec.VolumeSnapshotRef)
obj, found, err := ctrl.snapshotStore.GetByKey(snapshotName)
if err != nil {
return err
}
if !found {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: snapshot %s not found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Fall through with snapshot = nil
} else {
var ok bool
snapshot, ok = obj.(*crdv1.VolumeSnapshot)
if !ok {
return fmt.Errorf("cannot convert object from snapshot cache to snapshot %q!?: %#v", content.Name, obj)
}
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: snapshot %s found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
}
if snapshot != nil && snapshot.UID != content.Spec.VolumeSnapshotRef.UID {
// The snapshot that the content was pointing to was deleted, and another
// with the same name created.
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content %s has different UID, the old one must have been deleted", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Treat the volume as bound to a missing claim.
snapshot = nil
}
if snapshot == nil {
ctrl.deleteSnapshotContent(content)
}
}
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content is bound to snapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// The VolumeSnapshotContent is reserved for a VolumeSnapshot;
// that VolumeSnapshot has not yet been bound to this VolumeSnapshotContent; the VolumeSnapshot sync will handle it.
if content.Spec.VolumeSnapshotRef.UID == "" {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: VolumeSnapshotContent is pre-bound to VolumeSnapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
return nil
}
// Get the VolumeSnapshot by _name_
var snapshot *crdv1.VolumeSnapshot
snapshotName := snapshotRefKey(content.Spec.VolumeSnapshotRef)
obj, found, err := ctrl.snapshotStore.GetByKey(snapshotName)
if err != nil {
return err
}
if !found {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: snapshot %s not found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Fall through with snapshot = nil
} else {
var ok bool
snapshot, ok = obj.(*crdv1.VolumeSnapshot)
if !ok {
return fmt.Errorf("cannot convert object from snapshot cache to snapshot %q!?: %#v", content.Name, obj)
}
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: snapshot %s found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
}
if snapshot != nil && snapshot.UID != content.Spec.VolumeSnapshotRef.UID {
// The snapshot that the content was pointing to was deleted, and another
// with the same name created.
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content %s has different UID, the old one must have been deleted", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Treat the content as bound to a missing snapshot.
snapshot = nil
}
if snapshot == nil {
ctrl.deleteSnapshotContent(content)
}
return nil
}
@@ -506,19 +506,20 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
}
// Create VolumeSnapshotContent in the database
volumeRef, err := ref.GetReference(scheme.Scheme, volume)
if err != nil {
return nil, err
}
snapshotRef, err := ref.GetReference(scheme.Scheme, snapshot)
if err != nil {
return nil, err
}
snapshotContent := &crdv1.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: contentName,
},
Spec: crdv1.VolumeSnapshotContentSpec{
VolumeSnapshotRef: &v1.ObjectReference{
Kind: "VolumeSnapshot",
Namespace: snapshot.Namespace,
Name: snapshot.Name,
UID: snapshot.UID,
APIVersion: "snapshot.storage.k8s.io/v1alpha1",
},
VolumeSnapshotRef: snapshotRef,
PersistentVolumeRef: volumeRef,
VolumeSnapshotSource: crdv1.VolumeSnapshotSource{
CSI: &crdv1.CSIVolumeSnapshotSource{