Merge pull request #230 from xing-yang/requeue_sidecar
Fix the requeue logic
This commit is contained in:
@@ -409,24 +409,6 @@ func (r *snapshotReactor) getChangeCount() int {
|
|||||||
return r.changedSinceLastSync
|
return r.changedSinceLastSync
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitForIdle waits until all tests, controllers and other goroutines do their
|
|
||||||
// job and no new actions are registered for 10 milliseconds.
|
|
||||||
func (r *snapshotReactor) waitForIdle() {
|
|
||||||
r.ctrl.runningOperations.WaitForCompletion()
|
|
||||||
// Check every 10ms if the controller does something and stop if it's
|
|
||||||
// idle.
|
|
||||||
oldChanges := -1
|
|
||||||
for {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
changes := r.getChangeCount()
|
|
||||||
if changes == oldChanges {
|
|
||||||
// No changes for last 10ms -> controller must be idle.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
oldChanges = changes
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitTest waits until all tests, controllers and other goroutines do their
|
// waitTest waits until all tests, controllers and other goroutines do their
|
||||||
// job and list of current contents/snapshots is equal to list of expected
|
// job and list of current contents/snapshots is equal to list of expected
|
||||||
// contents/snapshots (with ~10 second timeout).
|
// contents/snapshots (with ~10 second timeout).
|
||||||
@@ -439,9 +421,6 @@ func (r *snapshotReactor) waitTest(test controllerTest) error {
|
|||||||
Steps: 10,
|
Steps: 10,
|
||||||
}
|
}
|
||||||
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
|
||||||
// Finish all operations that are in progress
|
|
||||||
r.ctrl.runningOperations.WaitForCompletion()
|
|
||||||
|
|
||||||
// Return 'true' if the reactor reached the expected state
|
// Return 'true' if the reactor reached the expected state
|
||||||
err1 := r.checkContents(test.expectedContents)
|
err1 := r.checkContents(test.expectedContents)
|
||||||
if err1 == nil {
|
if err1 == nil {
|
||||||
@@ -768,7 +747,7 @@ func runSyncContentTests(t *testing.T, tests []controllerTest, snapshotClasses [
|
|||||||
|
|
||||||
// Run the tested functions
|
// Run the tested functions
|
||||||
err = test.test(ctrl, reactor, test)
|
err = test.test(ctrl, reactor, test)
|
||||||
if err != nil {
|
if test.expectSuccess && err != nil {
|
||||||
t.Errorf("Test %q failed: %v", test.name, err)
|
t.Errorf("Test %q failed: %v", test.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -29,8 +29,6 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
|
||||||
"k8s.io/kubernetes/pkg/util/slice"
|
"k8s.io/kubernetes/pkg/util/slice"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -73,56 +71,26 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps
|
|||||||
// if there is one so that API server could delete the object if there is
|
// if there is one so that API server could delete the object if there is
|
||||||
// no other finalizer.
|
// no other finalizer.
|
||||||
return ctrl.removeContentFinalizer(content)
|
return ctrl.removeContentFinalizer(content)
|
||||||
|
|
||||||
}
|
}
|
||||||
if content.Spec.Source.VolumeHandle != nil && content.Status == nil {
|
if content.Spec.Source.VolumeHandle != nil && content.Status == nil {
|
||||||
klog.V(5).Infof("syncContent: Call CreateSnapshot for content %s", content.Name)
|
klog.V(5).Infof("syncContent: Call CreateSnapshot for content %s", content.Name)
|
||||||
ctrl.createSnapshot(content)
|
return ctrl.createSnapshot(content)
|
||||||
} else {
|
|
||||||
// Skip checkandUpdateContentStatus() if ReadyToUse is
|
|
||||||
// already true. We don't want to keep calling CreateSnapshot
|
|
||||||
// or ListSnapshots CSI methods over and over again for
|
|
||||||
// performance reasons.
|
|
||||||
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
|
|
||||||
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
|
|
||||||
err := ctrl.removeAnnVolumeSnapshotBeingCreated(content)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
ctrl.checkandUpdateContentStatus(content)
|
|
||||||
}
|
}
|
||||||
|
// Skip checkandUpdateContentStatus() if ReadyToUse is
|
||||||
return nil
|
// already true. We don't want to keep calling CreateSnapshot
|
||||||
|
// or ListSnapshots CSI methods over and over again for
|
||||||
|
// performance reasons.
|
||||||
|
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
|
||||||
|
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
|
||||||
|
return ctrl.removeAnnVolumeSnapshotBeingCreated(content)
|
||||||
|
}
|
||||||
|
return ctrl.checkandUpdateContentStatus(content)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteCSISnapshot starts delete action.
|
// deleteCSISnapshot starts delete action.
|
||||||
func (ctrl *csiSnapshotSideCarController) deleteCSISnapshot(content *crdv1.VolumeSnapshotContent) error {
|
func (ctrl *csiSnapshotSideCarController) deleteCSISnapshot(content *crdv1.VolumeSnapshotContent) error {
|
||||||
operationName := fmt.Sprintf("delete-%s", content.Name)
|
klog.V(5).Infof("Deleting snapshot for content: %s", content.Name)
|
||||||
klog.V(5).Infof("schedule to delete snapshot, operation named %s", operationName)
|
return ctrl.deleteCSISnapshotOperation(content)
|
||||||
ctrl.scheduleOperation(operationName, func() error {
|
|
||||||
return ctrl.deleteCSISnapshotOperation(content)
|
|
||||||
})
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// scheduleOperation starts given asynchronous operation on given snapshot. It
|
|
||||||
// makes sure there is no running operation with the same operationName
|
|
||||||
func (ctrl *csiSnapshotSideCarController) scheduleOperation(operationName string, operation func() error) {
|
|
||||||
klog.V(5).Infof("scheduleOperation[%s]", operationName)
|
|
||||||
|
|
||||||
err := ctrl.runningOperations.Run(operationName, operation)
|
|
||||||
if err != nil {
|
|
||||||
switch {
|
|
||||||
case goroutinemap.IsAlreadyExists(err):
|
|
||||||
klog.V(4).Infof("operation %q is already running, skipping", operationName)
|
|
||||||
case exponentialbackoff.IsExponentialBackoff(err):
|
|
||||||
klog.V(4).Infof("operation %q postponed due to exponential backoff", operationName)
|
|
||||||
default:
|
|
||||||
klog.Errorf("error scheduling operation %q: %v", operationName, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{}) (bool, error) {
|
func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{}) (bool, error) {
|
||||||
@@ -130,44 +98,38 @@ func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{}
|
|||||||
}
|
}
|
||||||
|
|
||||||
// createSnapshot starts new asynchronous operation to create snapshot
|
// createSnapshot starts new asynchronous operation to create snapshot
|
||||||
func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) {
|
func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) error {
|
||||||
klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name)
|
klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name)
|
||||||
opName := fmt.Sprintf("create-%s", content.Name)
|
contentObj, err := ctrl.createSnapshotWrapper(content)
|
||||||
ctrl.scheduleOperation(opName, func() error {
|
if err != nil {
|
||||||
contentObj, err := ctrl.createSnapshotWrapper(content)
|
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
|
||||||
if err != nil {
|
klog.Errorf("createSnapshot for content [%s]: error occurred in createSnapshotWrapper: %v", content.Name, err)
|
||||||
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
|
return err
|
||||||
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotWrapper: %v", opName, err)
|
}
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, updateErr := ctrl.storeContentUpdate(contentObj)
|
_, updateErr := ctrl.storeContentUpdate(contentObj)
|
||||||
if updateErr != nil {
|
if updateErr != nil {
|
||||||
// We will get an "snapshot update" event soon, this is not a big error
|
// We will get an "snapshot update" event soon, this is not a big error
|
||||||
klog.V(4).Infof("createSnapshot [%s]: cannot update internal content cache: %v", content.Name, updateErr)
|
klog.V(4).Infof("createSnapshot for content [%s]: cannot update internal content cache: %v", content.Name, updateErr)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) {
|
func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) error {
|
||||||
klog.V(5).Infof("checkandUpdateContentStatus[%s] started", content.Name)
|
klog.V(5).Infof("checkandUpdateContentStatus[%s] started", content.Name)
|
||||||
opName := fmt.Sprintf("check-%s", content.Name)
|
contentObj, err := ctrl.checkandUpdateContentStatusOperation(content)
|
||||||
ctrl.scheduleOperation(opName, func() error {
|
if err != nil {
|
||||||
contentObj, err := ctrl.checkandUpdateContentStatusOperation(content)
|
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err))
|
||||||
if err != nil {
|
klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err)
|
||||||
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err))
|
return err
|
||||||
klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err)
|
}
|
||||||
return err
|
_, updateErr := ctrl.storeContentUpdate(contentObj)
|
||||||
}
|
if updateErr != nil {
|
||||||
_, updateErr := ctrl.storeContentUpdate(contentObj)
|
// We will get an "snapshot update" event soon, this is not a big error
|
||||||
if updateErr != nil {
|
klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr)
|
||||||
// We will get an "snapshot update" event soon, this is not a big error
|
}
|
||||||
klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateContentStatusWithEvent saves new content.Status to API server and emits
|
// updateContentStatusWithEvent saves new content.Status to API server and emits
|
||||||
@@ -384,8 +346,8 @@ func (ctrl *csiSnapshotSideCarController) deleteCSISnapshotOperation(content *cr
|
|||||||
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotDeleteError", "Failed to clear content status")
|
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotDeleteError", "Failed to clear content status")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// update local cache
|
// trigger syncContent
|
||||||
ctrl.updateContentInCacheStore(newContent)
|
ctrl.updateContentInInformerCache(newContent)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -37,7 +37,6 @@ import (
|
|||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type csiSnapshotSideCarController struct {
|
type csiSnapshotSideCarController struct {
|
||||||
@@ -55,8 +54,6 @@ type csiSnapshotSideCarController struct {
|
|||||||
contentStore cache.Store
|
contentStore cache.Store
|
||||||
|
|
||||||
handler Handler
|
handler Handler
|
||||||
// Map of scheduled/running operations.
|
|
||||||
runningOperations goroutinemap.GoRoutineMap
|
|
||||||
|
|
||||||
resyncPeriod time.Duration
|
resyncPeriod time.Duration
|
||||||
}
|
}
|
||||||
@@ -81,15 +78,14 @@ func NewCSISnapshotSideCarController(
|
|||||||
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)})
|
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)})
|
||||||
|
|
||||||
ctrl := &csiSnapshotSideCarController{
|
ctrl := &csiSnapshotSideCarController{
|
||||||
clientset: clientset,
|
clientset: clientset,
|
||||||
client: client,
|
client: client,
|
||||||
driverName: driverName,
|
driverName: driverName,
|
||||||
eventRecorder: eventRecorder,
|
eventRecorder: eventRecorder,
|
||||||
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
|
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
|
||||||
runningOperations: goroutinemap.NewGoRoutineMap(true),
|
resyncPeriod: resyncPeriod,
|
||||||
resyncPeriod: resyncPeriod,
|
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
|
||||||
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
|
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
|
||||||
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
|
volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
@@ -149,62 +145,77 @@ func (ctrl *csiSnapshotSideCarController) enqueueContentWork(obj interface{}) {
|
|||||||
// contentWorker processes items from contentQueue. It must run only once,
|
// contentWorker processes items from contentQueue. It must run only once,
|
||||||
// syncContent is not assured to be reentrant.
|
// syncContent is not assured to be reentrant.
|
||||||
func (ctrl *csiSnapshotSideCarController) contentWorker() {
|
func (ctrl *csiSnapshotSideCarController) contentWorker() {
|
||||||
workFunc := func() bool {
|
for ctrl.processNextItem() {
|
||||||
keyObj, quit := ctrl.contentQueue.Get()
|
}
|
||||||
if quit {
|
}
|
||||||
return true
|
|
||||||
}
|
|
||||||
defer ctrl.contentQueue.Done(keyObj)
|
|
||||||
key := keyObj.(string)
|
|
||||||
klog.V(5).Infof("contentWorker[%s]", key)
|
|
||||||
|
|
||||||
_, name, err := cache.SplitMetaNamespaceKey(key)
|
func (ctrl *csiSnapshotSideCarController) processNextItem() bool {
|
||||||
if err != nil {
|
keyObj, quit := ctrl.contentQueue.Get()
|
||||||
klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
|
if quit {
|
||||||
return false
|
|
||||||
}
|
|
||||||
content, err := ctrl.contentLister.Get(name)
|
|
||||||
// The content still exists in informer cache, the event must have
|
|
||||||
// been add/update/sync
|
|
||||||
if err == nil {
|
|
||||||
if ctrl.isDriverMatch(content) {
|
|
||||||
ctrl.updateContentInCacheStore(content)
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !errors.IsNotFound(err) {
|
|
||||||
klog.V(2).Infof("error getting content %q from informer: %v", key, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// The content is not in informer cache, the event must have been
|
|
||||||
// "delete"
|
|
||||||
contentObj, found, err := ctrl.contentStore.GetByKey(key)
|
|
||||||
if err != nil {
|
|
||||||
klog.V(2).Infof("error getting content %q from cache: %v", key, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
// The controller has already processed the delete event and
|
|
||||||
// deleted the content from its cache
|
|
||||||
klog.V(2).Infof("deletion of content %q was already processed", key)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
|
|
||||||
if !ok {
|
|
||||||
klog.Errorf("expected content, got %+v", content)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
ctrl.deleteContentInCacheStore(content)
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
defer ctrl.contentQueue.Done(keyObj)
|
||||||
|
|
||||||
for {
|
if err := ctrl.syncContentByKey(keyObj.(string)); err != nil {
|
||||||
if quit := workFunc(); quit {
|
// Rather than wait for a full resync, re-add the key to the
|
||||||
klog.Infof("content worker queue shutting down")
|
// queue to be processed.
|
||||||
return
|
ctrl.contentQueue.AddRateLimited(keyObj)
|
||||||
}
|
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally, if no error occurs we Forget this item so it does not
|
||||||
|
// get queued again until another change happens.
|
||||||
|
ctrl.contentQueue.Forget(keyObj)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
|
||||||
|
klog.V(5).Infof("syncContentByKey[%s]", key)
|
||||||
|
|
||||||
|
_, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
|
if err != nil {
|
||||||
|
klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
content, err := ctrl.contentLister.Get(name)
|
||||||
|
// The content still exists in informer cache, the event must have
|
||||||
|
// been add/update/sync
|
||||||
|
if err == nil {
|
||||||
|
if ctrl.isDriverMatch(content) {
|
||||||
|
err = ctrl.updateContentInInformerCache(content)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// If error occurs we add this item back to the queue
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !errors.IsNotFound(err) {
|
||||||
|
klog.V(2).Infof("error getting content %q from informer: %v", key, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The content is not in informer cache, the event must have been
|
||||||
|
// "delete"
|
||||||
|
contentObj, found, err := ctrl.contentStore.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
klog.V(2).Infof("error getting content %q from cache: %v", key, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
// The controller has already processed the delete event and
|
||||||
|
// deleted the content from its cache
|
||||||
|
klog.V(2).Infof("deletion of content %q was already processed", key)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
|
||||||
|
if !ok {
|
||||||
|
klog.Errorf("expected content, got %+v", content)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ctrl.deleteContentInCacheStore(content)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name
|
// verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name
|
||||||
@@ -228,9 +239,9 @@ func (ctrl *csiSnapshotSideCarController) isDriverMatch(content *crdv1.VolumeSna
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateContent runs in worker thread and handles "content added",
|
// updateContentInInformerCache runs in worker thread and handles "content added",
|
||||||
// "content updated" and "periodic sync" events.
|
// "content updated" and "periodic sync" events.
|
||||||
func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crdv1.VolumeSnapshotContent) {
|
func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) error {
|
||||||
// Store the new content version in the cache and do not process it if this is
|
// Store the new content version in the cache and do not process it if this is
|
||||||
// an old version.
|
// an old version.
|
||||||
new, err := ctrl.storeContentUpdate(content)
|
new, err := ctrl.storeContentUpdate(content)
|
||||||
@@ -238,7 +249,7 @@ func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crd
|
|||||||
klog.Errorf("%v", err)
|
klog.Errorf("%v", err)
|
||||||
}
|
}
|
||||||
if !new {
|
if !new {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
err = ctrl.syncContent(content)
|
err = ctrl.syncContent(content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -249,7 +260,9 @@ func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crd
|
|||||||
} else {
|
} else {
|
||||||
klog.Errorf("could not sync content %q: %+v", content.Name, err)
|
klog.Errorf("could not sync content %q: %+v", content.Name, err)
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteContent runs in worker thread and handles "content deleted" event.
|
// deleteContent runs in worker thread and handles "content deleted" event.
|
||||||
|
@@ -172,6 +172,7 @@ func TestDeleteSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedListCalls: []listCall{{"sid1-1", map[string]string{}, true, time.Now(), 1, nil}},
|
expectedListCalls: []listCall{{"sid1-1", map[string]string{}, true, time.Now(), 1, nil}},
|
||||||
expectedDeleteCalls: []deleteCall{{"sid1-1", nil, nil}},
|
expectedDeleteCalls: []deleteCall{{"sid1-1", nil, nil}},
|
||||||
|
expectSuccess: true,
|
||||||
test: testSyncContent,
|
test: testSyncContent,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -194,6 +195,7 @@ func TestDeleteSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expectedListCalls: []listCall{{"sid1-2", map[string]string{}, true, time.Now(), 1, nil}},
|
expectedListCalls: []listCall{{"sid1-2", map[string]string{}, true, time.Now(), 1, nil}},
|
||||||
expectedDeleteCalls: []deleteCall{{"sid1-2", nil, nil}},
|
expectedDeleteCalls: []deleteCall{{"sid1-2", nil, nil}},
|
||||||
|
expectSuccess: true,
|
||||||
test: testSyncContent,
|
test: testSyncContent,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -223,8 +225,12 @@ func TestDeleteSync(t *testing.T) {
|
|||||||
initialContents: newContentArrayWithDeletionTimestamp("content1-1", "snapuid1-1", "snap1-1", "sid1-1", "invalid", "", "snap1-4-volumehandle", deletionPolicy, nil, nil, true, &timeNowMetav1),
|
initialContents: newContentArrayWithDeletionTimestamp("content1-1", "snapuid1-1", "snap1-1", "sid1-1", "invalid", "", "snap1-4-volumehandle", deletionPolicy, nil, nil, true, &timeNowMetav1),
|
||||||
expectedContents: newContentArrayWithDeletionTimestamp("content1-1", "snapuid1-1", "snap1-1", "sid1-1", "invalid", "", "snap1-4-volumehandle", deletionPolicy, nil, nil, true, &timeNowMetav1),
|
expectedContents: newContentArrayWithDeletionTimestamp("content1-1", "snapuid1-1", "snap1-1", "sid1-1", "invalid", "", "snap1-4-volumehandle", deletionPolicy, nil, nil, true, &timeNowMetav1),
|
||||||
expectedEvents: noevents,
|
expectedEvents: noevents,
|
||||||
errors: noerrors,
|
errors: []reactorError{
|
||||||
test: testSyncContent,
|
// Inject error to the first client.VolumesnapshotV1beta1().VolumeSnapshotContents().Delete call.
|
||||||
|
// All other calls will succeed.
|
||||||
|
{"get", "secrets", errors.New("mock get invalid secret error")},
|
||||||
|
},
|
||||||
|
test: testSyncContent,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "1-5 - csi driver delete snapshot returns error, bound finalizer should remain",
|
name: "1-5 - csi driver delete snapshot returns error, bound finalizer should remain",
|
||||||
|
Reference in New Issue
Block a user