Merge pull request #261 from xing-yang/create_timeout
Add AnnVolumeSnapshotBeingCreated
This commit is contained in:
@@ -30,10 +30,11 @@ func TestSyncContent(t *testing.T) {
|
||||
|
||||
tests := []controllerTest{
|
||||
{
|
||||
name: "1-1: Basic content update ready to use",
|
||||
initialContents: newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &False, true),
|
||||
expectedContents: newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &True, true),
|
||||
expectedEvents: noevents,
|
||||
name: "1-1: Basic content update ready to use",
|
||||
initialContents: newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &False, true),
|
||||
expectedContents: withContentAnnotations(newContentArrayWithReadyToUse("content1-1", "snapuid1-1", "snap1-1", "sid1-1", defaultClass, "", "volume-handle-1-1", retainPolicy, nil, &defaultSize, &True, true),
|
||||
map[string]string{}),
|
||||
expectedEvents: noevents,
|
||||
expectedCreateCalls: []createCall{
|
||||
{
|
||||
volumeHandle: "volume-handle-1-1",
|
||||
@@ -52,8 +53,9 @@ func TestSyncContent(t *testing.T) {
|
||||
name: "1-2: Basic sync content create snapshot",
|
||||
initialContents: withContentStatus(newContentArray("content1-2", "snapuid1-2", "snap1-2", "sid1-2", defaultClass, "", "volume-handle-1-2", retainPolicy, nil, &defaultSize, true),
|
||||
nil),
|
||||
expectedContents: withContentStatus(newContentArray("content1-2", "snapuid1-2", "snap1-2", "sid1-2", defaultClass, "", "volume-handle-1-2", retainPolicy, nil, &defaultSize, true),
|
||||
expectedContents: withContentAnnotations(withContentStatus(newContentArray("content1-2", "snapuid1-2", "snap1-2", "sid1-2", defaultClass, "", "volume-handle-1-2", retainPolicy, nil, &defaultSize, true),
|
||||
&crdv1.VolumeSnapshotContentStatus{SnapshotHandle: toStringPointer("snapuid1-2"), RestoreSize: &defaultSize, ReadyToUse: &True}),
|
||||
map[string]string{}),
|
||||
expectedEvents: noevents,
|
||||
expectedCreateCalls: []createCall{
|
||||
{
|
||||
@@ -161,7 +163,7 @@ func TestSyncContent(t *testing.T) {
|
||||
SnapshotHandle: toStringPointer("sid1-6"),
|
||||
RestoreSize: &defaultSize,
|
||||
ReadyToUse: &False,
|
||||
Error: newSnapshotError("Failed to check and update snapshot content: failed to get input parameters to create snapshot content1-6: \"failed to retrieve snapshot class bad-class from the informer: \\\"volumesnapshotclass.snapshot.storage.k8s.io \\\\\\\"bad-class\\\\\\\" not found\\\"\""),
|
||||
Error: newSnapshotError("Failed to check and update snapshot content: failed to get input parameters to create snapshot for content content1-6: \"failed to retrieve snapshot class bad-class from the informer: \\\"volumesnapshotclass.snapshot.storage.k8s.io \\\\\\\"bad-class\\\\\\\" not found\\\"\""),
|
||||
}),
|
||||
expectedEvents: []string{"Warning SnapshotContentCheckandUpdateFailed"},
|
||||
expectedCreateCalls: []createCall{
|
||||
|
@@ -23,6 +23,8 @@ import (
|
||||
|
||||
crdv1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
|
||||
"github.com/kubernetes-csi/external-snapshotter/v2/pkg/utils"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog"
|
||||
@@ -81,6 +83,11 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps
|
||||
// or ListSnapshots CSI methods over and over again for
|
||||
// performance reasons.
|
||||
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
|
||||
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
|
||||
err := ctrl.removeAnnVolumeSnapshotBeingCreated(content)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ctrl.checkandUpdateContentStatus(content)
|
||||
@@ -126,10 +133,10 @@ func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSn
|
||||
klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name)
|
||||
opName := fmt.Sprintf("create-%s", content.Name)
|
||||
ctrl.scheduleOperation(opName, func() error {
|
||||
contentObj, err := ctrl.createSnapshotOperation(content)
|
||||
contentObj, err := ctrl.createSnapshotWrapper(content)
|
||||
if err != nil {
|
||||
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
|
||||
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)
|
||||
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotWrapper: %v", opName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -276,75 +283,80 @@ func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatusOperation(c
|
||||
}
|
||||
driverName = content.Spec.Driver
|
||||
snapshotID = *content.Spec.Source.SnapshotHandle
|
||||
} else {
|
||||
class, snapshotterCredentials, err := ctrl.getCSISnapshotInput(content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", content.Name, err)
|
||||
|
||||
klog.V(5).Infof("checkandUpdateContentStatusOperation: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)
|
||||
|
||||
if creationTime.IsZero() {
|
||||
creationTime = time.Now()
|
||||
}
|
||||
|
||||
driverName, snapshotID, creationTime, size, readyToUse, err = ctrl.handler.CreateSnapshot(content, class.Parameters, snapshotterCredentials)
|
||||
updatedContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, creationTime.UnixNano(), size)
|
||||
if err != nil {
|
||||
klog.Errorf("checkandUpdateContentStatusOperation: failed to call create snapshot to check whether the snapshot is ready to use %q", err)
|
||||
return nil, err
|
||||
}
|
||||
return updatedContent, nil
|
||||
} else {
|
||||
return ctrl.createSnapshotWrapper(content)
|
||||
}
|
||||
klog.V(5).Infof("checkandUpdateContentStatusOperation: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)
|
||||
|
||||
if creationTime.IsZero() {
|
||||
creationTime = time.Now()
|
||||
}
|
||||
|
||||
updateContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, creationTime.UnixNano(), size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return updateContent, nil
|
||||
}
|
||||
|
||||
// The function goes through the whole snapshot creation process.
|
||||
// 1. Trigger the snapshot through csi storage provider.
|
||||
// 2. Update VolumeSnapshot status with creationtimestamp information
|
||||
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
|
||||
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
|
||||
func (ctrl *csiSnapshotSideCarController) createSnapshotOperation(content *crdv1.VolumeSnapshotContent) (*crdv1.VolumeSnapshotContent, error) {
|
||||
klog.Infof("createSnapshotOperation: Creating snapshot for content %s through the plugin ...", content.Name)
|
||||
|
||||
// content.Status will be created for the first time after a snapshot
|
||||
// is created by the CSI driver. If content.Status is not nil,
|
||||
// we should update content status without creating snapshot again.
|
||||
if content.Status != nil && content.Status.Error != nil && content.Status.Error.Message != nil && !isControllerUpdateFailError(content.Status.Error) {
|
||||
klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", *content.Status.Error.Message)
|
||||
return content, nil
|
||||
}
|
||||
// This is a wrapper function for the snapshot creation process.
|
||||
func (ctrl *csiSnapshotSideCarController) createSnapshotWrapper(content *crdv1.VolumeSnapshotContent) (*crdv1.VolumeSnapshotContent, error) {
|
||||
klog.Infof("createSnapshotWrapper: Creating snapshot for content %s through the plugin ...", content.Name)
|
||||
|
||||
class, snapshotterCredentials, err := ctrl.getCSISnapshotInput(content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get input parameters to create snapshot for content %s: %q", content.Name, err)
|
||||
}
|
||||
|
||||
// NOTE(xyang): handle create timeout
|
||||
// Add an annotation to indicate the snapshot creation request has been
|
||||
// sent to the storage system and the controller is waiting for a response.
|
||||
// The annotation will be removed after the storage system has responded with
|
||||
// success or permanent failure. If the request times out, annotation will
|
||||
// remain on the content to avoid potential leaking of a snapshot resource on
|
||||
// the storage system.
|
||||
err = ctrl.setAnnVolumeSnapshotBeingCreated(content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to add VolumeSnapshotBeingCreated annotation on the content %s: %q", content.Name, err)
|
||||
}
|
||||
|
||||
driverName, snapshotID, creationTime, size, readyToUse, err := ctrl.handler.CreateSnapshot(content, class.Parameters, snapshotterCredentials)
|
||||
if err != nil {
|
||||
// NOTE(xyang): handle create timeout
|
||||
// If it is a final error, remove annotation to indicate
|
||||
// storage system has responded with an error
|
||||
klog.Infof("createSnapshotWrapper: CreateSnapshot for content %s returned error: %v", content.Name, err)
|
||||
if isCSIFinalError(err) {
|
||||
err = ctrl.removeAnnVolumeSnapshotBeingCreated(content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", *content.Spec.Source.VolumeHandle, err)
|
||||
}
|
||||
if driverName != class.Driver {
|
||||
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: driver name %s returned from the driver is different from driver %s in snapshot class", *content.Spec.Source.VolumeHandle, driverName, class.Driver)
|
||||
}
|
||||
|
||||
klog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)
|
||||
|
||||
timestamp := creationTime.UnixNano()
|
||||
newContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, timestamp, size)
|
||||
if creationTime.IsZero() {
|
||||
creationTime = time.Now()
|
||||
}
|
||||
|
||||
newContent, err := ctrl.updateSnapshotContentStatus(content, snapshotID, readyToUse, creationTime.UnixNano(), size)
|
||||
if err != nil {
|
||||
strerr := fmt.Sprintf("error updating volume snapshot content status for snapshot %s: %v.", content.Name, err)
|
||||
klog.Error(strerr)
|
||||
klog.Errorf("error updating status for volume snapshot content %s: %v.", content.Name, err)
|
||||
return nil, fmt.Errorf("error updating status for volume snapshot content %s: %v.", content.Name, err)
|
||||
} else {
|
||||
content = newContent
|
||||
}
|
||||
|
||||
// Update content in the cache store
|
||||
_, err = ctrl.storeContentUpdate(content)
|
||||
// NOTE(xyang): handle create timeout
|
||||
// Remove annotation to indicate storage system has successfully
|
||||
// cut the snapshot
|
||||
err = ctrl.removeAnnVolumeSnapshotBeingCreated(content)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to update content store %v", err)
|
||||
return nil, fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation on the content %s: %q", content.Name, err)
|
||||
}
|
||||
|
||||
return content, nil
|
||||
@@ -564,9 +576,99 @@ func (ctrl *csiSnapshotSideCarController) shouldDelete(content *crdv1.VolumeSnap
|
||||
if content.Spec.Source.SnapshotHandle != nil && content.Spec.VolumeSnapshotRef.UID == "" {
|
||||
return true
|
||||
}
|
||||
// 2) shouldDelete returns true if AnnVolumeSnapshotBeingDeleted annotation is set
|
||||
|
||||
// NOTE(xyang): Handle create snapshot timeout
|
||||
// 2) shouldDelete returns false if AnnVolumeSnapshotBeingCreated
|
||||
// annotation is set. This indicates a CreateSnapshot CSI RPC has
|
||||
// not responded with success or failure.
|
||||
// We need to keep waiting for a response from the CSI driver.
|
||||
if metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated) {
|
||||
return false
|
||||
}
|
||||
|
||||
// 3) shouldDelete returns true if AnnVolumeSnapshotBeingDeleted annotation is set
|
||||
if metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingDeleted) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// setAnnVolumeSnapshotBeingCreated sets VolumeSnapshotBeingCreated annotation
|
||||
// on VolumeSnapshotContent
|
||||
// If set, it indicates snapshot is being created
|
||||
func (ctrl *csiSnapshotSideCarController) setAnnVolumeSnapshotBeingCreated(content *crdv1.VolumeSnapshotContent) error {
|
||||
if metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated) {
|
||||
// the annotation already exists, return directly
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set AnnVolumeSnapshotBeingCreated
|
||||
klog.V(5).Infof("setAnnVolumeSnapshotBeingCreated: set annotation [%s:yes] on content [%s].", utils.AnnVolumeSnapshotBeingCreated, content.Name)
|
||||
contentClone := content.DeepCopy()
|
||||
metav1.SetMetaDataAnnotation(&contentClone.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated, "yes")
|
||||
|
||||
updatedContent, err := ctrl.clientset.SnapshotV1beta1().VolumeSnapshotContents().Update(contentClone)
|
||||
if err != nil {
|
||||
return newControllerUpdateError(content.Name, err.Error())
|
||||
}
|
||||
// update content if update is successful
|
||||
content = updatedContent
|
||||
|
||||
_, err = ctrl.storeContentUpdate(content)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("setAnnVolumeSnapshotBeingCreated for content [%s]: cannot update internal cache %v", content.Name, err)
|
||||
}
|
||||
klog.V(5).Infof("setAnnVolumeSnapshotBeingCreated: volume snapshot content %+v", content)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeAnnVolumeSnapshotBeingCreated removes the VolumeSnapshotBeingCreated
|
||||
// annotation from a content if there exists one.
|
||||
func (ctrl csiSnapshotSideCarController) removeAnnVolumeSnapshotBeingCreated(content *crdv1.VolumeSnapshotContent) error {
|
||||
if !metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingCreated) {
|
||||
// the annotation does not exist, return directly
|
||||
return nil
|
||||
}
|
||||
contentClone := content.DeepCopy()
|
||||
delete(contentClone.ObjectMeta.Annotations, utils.AnnVolumeSnapshotBeingCreated)
|
||||
|
||||
updatedContent, err := ctrl.clientset.SnapshotV1beta1().VolumeSnapshotContents().Update(contentClone)
|
||||
if err != nil {
|
||||
return newControllerUpdateError(content.Name, err.Error())
|
||||
}
|
||||
// update content if update is successful
|
||||
content = updatedContent
|
||||
|
||||
klog.V(5).Infof("Removed VolumeSnapshotBeingCreated annotation from volume snapshot content %s", content.Name)
|
||||
_, err = ctrl.storeContentUpdate(content)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to update content store %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This function checks if the error is final
|
||||
func isCSIFinalError(err error) bool {
|
||||
// Sources:
|
||||
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
|
||||
// https://github.com/container-storage-interface/spec/blob/master/spec.md
|
||||
st, ok := status.FromError(err)
|
||||
if !ok {
|
||||
// This is not gRPC error. The operation must have failed before gRPC
|
||||
// method was called, otherwise we would get gRPC error.
|
||||
// We don't know if any previous CreateSnapshot is in progress, be on the safe side.
|
||||
return false
|
||||
}
|
||||
switch st.Code() {
|
||||
case codes.Canceled, // gRPC: Client Application cancelled the request
|
||||
codes.DeadlineExceeded, // gRPC: Timeout
|
||||
codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateSnapshot() may be still in progress.
|
||||
codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateSnapshot() may be still in progress.
|
||||
codes.Aborted: // CSI: Operation pending for Snapshot
|
||||
return false
|
||||
}
|
||||
// All other errors mean that creating snapshot either did not
|
||||
// even start or failed. It is for sure not in progress.
|
||||
return true
|
||||
}
|
||||
|
@@ -78,6 +78,19 @@ const (
|
||||
// backing the snapshot content.
|
||||
AnnVolumeSnapshotBeingDeleted = "snapshot.storage.kubernetes.io/volumesnapshot-being-deleted"
|
||||
|
||||
// AnnVolumeSnapshotBeingCreated annotation applies to VolumeSnapshotContents.
|
||||
// If it is set, it indicates that the csi-snapshotter
|
||||
// sidecar has sent the create snapshot request to the storage system and
|
||||
// is waiting for a response of success or failure.
|
||||
// This annotation will be removed once the driver's CreateSnapshot
|
||||
// CSI function returns success or a final error (determined by isFinalError()).
|
||||
// If the create snapshot request fails with a non-final error such as timeout,
|
||||
// retry will happen and the annotation will remain.
|
||||
// This only applies to dynamic provisioning of snapshots because
|
||||
// the create snapshot CSI method will not be called for pre-provisioned
|
||||
// snapshots.
|
||||
AnnVolumeSnapshotBeingCreated = "snapshot.storage.kubernetes.io/volumesnapshot-being-created"
|
||||
|
||||
// Annotation for secret name and namespace will be added to the content
|
||||
// and used at snapshot content deletion time.
|
||||
AnnDeletionSecretRefName = "snapshot.storage.kubernetes.io/deletion-secret-name"
|
||||
|
Reference in New Issue
Block a user