Re-queue SnapshotContents that are readyToUse: false

SnapshotContents with readyToUse: false should be periodically requeued
with exp. backoff until the CSI driver confirms the snapshot is ready.
This commit is contained in:
Jan Safranek
2023-11-10 12:17:18 +01:00
parent f9e125b994
commit 8a29bf5b21
4 changed files with 174 additions and 45 deletions

View File

@@ -18,9 +18,10 @@ package sidecar_controller
import (
"fmt"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
"time"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
@@ -205,6 +206,7 @@ func (ctrl *csiSnapshotSideCarController) enqueueContentWork(obj interface{}) {
return
}
klog.V(5).Infof("enqueued %q for sync", objName)
ctrl.contentQueue.Add(objName)
}
}
@@ -223,11 +225,15 @@ func (ctrl *csiSnapshotSideCarController) processNextItem() bool {
}
defer ctrl.contentQueue.Done(keyObj)
if err := ctrl.syncContentByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.contentQueue.AddRateLimited(keyObj)
requeue, err := ctrl.syncContentByKey(keyObj.(string))
if err != nil {
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
// Always requeue on error to be able to call functions like "return false, doSomething()" where doSomething
// does not need to worry about re-queueing.
requeue = true
}
if requeue {
ctrl.contentQueue.AddRateLimited(keyObj)
return true
}
@@ -237,30 +243,32 @@ func (ctrl *csiSnapshotSideCarController) processNextItem() bool {
return true
}
func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
// syncContentByKey syncs a single content. It returns true if the controller should
// requeue the item again. On error, content is always requeued.
func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) (requeue bool, err error) {
klog.V(5).Infof("syncContentByKey[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
return nil
return false, nil
}
content, err := ctrl.contentLister.Get(name)
// The content still exists in informer cache, the event must have
// been add/update/sync
if err == nil {
if ctrl.isDriverMatch(content) {
err = ctrl.updateContentInInformerCache(content)
requeue, err = ctrl.updateContentInInformerCache(content)
}
if err != nil {
// If error occurs we add this item back to the queue
return err
return true, err
}
return nil
return requeue, nil
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting content %q from informer: %v", key, err)
return nil
return false, nil
}
// The content is not in informer cache, the event must have been
@@ -268,21 +276,21 @@ func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
contentObj, found, err := ctrl.contentStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting content %q from cache: %v", key, err)
return nil
return false, nil
}
if !found {
// The controller has already processed the delete event and
// deleted the content from its cache
klog.V(2).Infof("deletion of content %q was already processed", key)
return nil
return false, nil
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
klog.Errorf("expected content, got %+v", content)
return nil
return false, nil
}
ctrl.deleteContentInCacheStore(content)
return nil
return false, nil
}
// isDriverMatch verifies whether the driver specified in VolumeSnapshotContent
@@ -331,7 +339,7 @@ func (ctrl *csiSnapshotSideCarController) isDriverMatch(object interface{}) bool
// updateContentInInformerCache runs in worker thread and handles "content added",
// "content updated" and "periodic sync" events.
func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) error {
func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) (requeue bool, err error) {
// Store the new content version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeContentUpdate(content)
@@ -339,9 +347,9 @@ func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *
klog.Errorf("%v", err)
}
if !new {
return nil
return false, nil
}
err = ctrl.syncContent(content)
requeue, err = ctrl.syncContent(content)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
@@ -350,9 +358,9 @@ func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *
} else {
klog.Errorf("could not sync content %q: %+v", content.Name, err)
}
return err
return requeue, err
}
return nil
return requeue, nil
}
// deleteContent runs in worker thread and handles "content deleted" event.