From cbd5b8b7a176c81758addc72c0b8a45bb619848a Mon Sep 17 00:00:00 2001 From: xing-yang Date: Thu, 2 Jan 2020 16:52:03 +0000 Subject: [PATCH] Add requeue in sidecar --- pkg/sidecar-controller/framework_test.go | 23 +-- pkg/sidecar-controller/snapshot_controller.go | 118 +++++--------- .../snapshot_controller_base.go | 145 ++++++++++-------- .../snapshot_delete_test.go | 10 +- 4 files changed, 128 insertions(+), 168 deletions(-) diff --git a/pkg/sidecar-controller/framework_test.go b/pkg/sidecar-controller/framework_test.go index 264b989a..dc0bf7d2 100644 --- a/pkg/sidecar-controller/framework_test.go +++ b/pkg/sidecar-controller/framework_test.go @@ -409,24 +409,6 @@ func (r *snapshotReactor) getChangeCount() int { return r.changedSinceLastSync } -// waitForIdle waits until all tests, controllers and other goroutines do their -// job and no new actions are registered for 10 milliseconds. -func (r *snapshotReactor) waitForIdle() { - r.ctrl.runningOperations.WaitForCompletion() - // Check every 10ms if the controller does something and stop if it's - // idle. - oldChanges := -1 - for { - time.Sleep(10 * time.Millisecond) - changes := r.getChangeCount() - if changes == oldChanges { - // No changes for last 10ms -> controller must be idle. - break - } - oldChanges = changes - } -} - // waitTest waits until all tests, controllers and other goroutines do their // job and list of current contents/snapshots is equal to list of expected // contents/snapshots (with ~10 second timeout). @@ -439,9 +421,6 @@ func (r *snapshotReactor) waitTest(test controllerTest) error { Steps: 10, } err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { - // Finish all operations that are in progress - r.ctrl.runningOperations.WaitForCompletion() - // Return 'true' if the reactor reached the expected state err1 := r.checkContents(test.expectedContents) if err1 == nil { @@ -768,7 +747,7 @@ func runSyncContentTests(t *testing.T, tests []controllerTest, snapshotClasses [ // Run the tested functions err = test.test(ctrl, reactor, test) - if err != nil { + if test.expectSuccess && err != nil { t.Errorf("Test %q failed: %v", test.name, err) } diff --git a/pkg/sidecar-controller/snapshot_controller.go b/pkg/sidecar-controller/snapshot_controller.go index dac6792f..96688f63 100644 --- a/pkg/sidecar-controller/snapshot_controller.go +++ b/pkg/sidecar-controller/snapshot_controller.go @@ -29,8 +29,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" - "k8s.io/kubernetes/pkg/util/goroutinemap" - "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" "k8s.io/kubernetes/pkg/util/slice" ) @@ -73,56 +71,26 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps // if there is one so that API server could delete the object if there is // no other finalizer. return ctrl.removeContentFinalizer(content) - } if content.Spec.Source.VolumeHandle != nil && content.Status == nil { klog.V(5).Infof("syncContent: Call CreateSnapshot for content %s", content.Name) - ctrl.createSnapshot(content) - } else { - // Skip checkandUpdateContentStatus() if ReadyToUse is - // already true. We don't want to keep calling CreateSnapshot - // or ListSnapshots CSI methods over and over again for - // performance reasons. - if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true { - // Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason - err := ctrl.removeAnnVolumeSnapshotBeingCreated(content) - if err != nil { - return fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err) - } - return nil - } - ctrl.checkandUpdateContentStatus(content) + return ctrl.createSnapshot(content) } - - return nil + // Skip checkandUpdateContentStatus() if ReadyToUse is + // already true. We don't want to keep calling CreateSnapshot + // or ListSnapshots CSI methods over and over again for + // performance reasons. + if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true { + // Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason + return ctrl.removeAnnVolumeSnapshotBeingCreated(content) + } + return ctrl.checkandUpdateContentStatus(content) } // deleteCSISnapshot starts delete action. func (ctrl *csiSnapshotSideCarController) deleteCSISnapshot(content *crdv1.VolumeSnapshotContent) error { - operationName := fmt.Sprintf("delete-%s", content.Name) - klog.V(5).Infof("schedule to delete snapshot, operation named %s", operationName) - ctrl.scheduleOperation(operationName, func() error { - return ctrl.deleteCSISnapshotOperation(content) - }) - return nil -} - -// scheduleOperation starts given asynchronous operation on given snapshot. It -// makes sure there is no running operation with the same operationName -func (ctrl *csiSnapshotSideCarController) scheduleOperation(operationName string, operation func() error) { - klog.V(5).Infof("scheduleOperation[%s]", operationName) - - err := ctrl.runningOperations.Run(operationName, operation) - if err != nil { - switch { - case goroutinemap.IsAlreadyExists(err): - klog.V(4).Infof("operation %q is already running, skipping", operationName) - case exponentialbackoff.IsExponentialBackoff(err): - klog.V(4).Infof("operation %q postponed due to exponential backoff", operationName) - default: - klog.Errorf("error scheduling operation %q: %v", operationName, err) - } - } + klog.V(5).Infof("Deleting snapshot for content: %s", content.Name) + return ctrl.deleteCSISnapshotOperation(content) } func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{}) (bool, error) { @@ -130,44 +98,38 @@ func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{} } // createSnapshot starts new asynchronous operation to create snapshot -func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) { +func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) error { klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name) - opName := fmt.Sprintf("create-%s", content.Name) - ctrl.scheduleOperation(opName, func() error { - contentObj, err := ctrl.createSnapshotWrapper(content) - if err != nil { - ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err)) - klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotWrapper: %v", opName, err) - return err - } + contentObj, err := ctrl.createSnapshotWrapper(content) + if err != nil { + ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err)) + klog.Errorf("createSnapshot for content [%s]: error occurred in createSnapshotWrapper: %v", content.Name, err) + return err + } - _, updateErr := ctrl.storeContentUpdate(contentObj) - if updateErr != nil { - // We will get an "snapshot update" event soon, this is not a big error - klog.V(4).Infof("createSnapshot [%s]: cannot update internal content cache: %v", content.Name, updateErr) - } - return nil - }) + _, updateErr := ctrl.storeContentUpdate(contentObj) + if updateErr != nil { + // We will get an "snapshot update" event soon, this is not a big error + klog.V(4).Infof("createSnapshot for content [%s]: cannot update internal content cache: %v", content.Name, updateErr) + } + return nil } -func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) { +func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) error { klog.V(5).Infof("checkandUpdateContentStatus[%s] started", content.Name) - opName := fmt.Sprintf("check-%s", content.Name) - ctrl.scheduleOperation(opName, func() error { - contentObj, err := ctrl.checkandUpdateContentStatusOperation(content) - if err != nil { - ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err)) - klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err) - return err - } - _, updateErr := ctrl.storeContentUpdate(contentObj) - if updateErr != nil { - // We will get an "snapshot update" event soon, this is not a big error - klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr) - } + contentObj, err := ctrl.checkandUpdateContentStatusOperation(content) + if err != nil { + ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err)) + klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err) + return err + } + _, updateErr := ctrl.storeContentUpdate(contentObj) + if updateErr != nil { + // We will get an "snapshot update" event soon, this is not a big error + klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr) + } - return nil - }) + return nil } // updateContentStatusWithEvent saves new content.Status to API server and emits @@ -384,8 +346,8 @@ func (ctrl *csiSnapshotSideCarController) deleteCSISnapshotOperation(content *cr ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotDeleteError", "Failed to clear content status") return err } - // update local cache - ctrl.updateContentInCacheStore(newContent) + // trigger syncContent + ctrl.updateContentInInformerCache(newContent) return nil } diff --git a/pkg/sidecar-controller/snapshot_controller_base.go b/pkg/sidecar-controller/snapshot_controller_base.go index 94493cd4..4ad7b9d7 100644 --- a/pkg/sidecar-controller/snapshot_controller_base.go +++ b/pkg/sidecar-controller/snapshot_controller_base.go @@ -37,7 +37,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/util/goroutinemap" ) type csiSnapshotSideCarController struct { @@ -55,8 +54,6 @@ type csiSnapshotSideCarController struct { contentStore cache.Store handler Handler - // Map of scheduled/running operations. - runningOperations goroutinemap.GoRoutineMap resyncPeriod time.Duration } @@ -81,15 +78,14 @@ func NewCSISnapshotSideCarController( eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)}) ctrl := &csiSnapshotSideCarController{ - clientset: clientset, - client: client, - driverName: driverName, - eventRecorder: eventRecorder, - handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength), - runningOperations: goroutinemap.NewGoRoutineMap(true), - resyncPeriod: resyncPeriod, - contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"), + clientset: clientset, + client: client, + driverName: driverName, + eventRecorder: eventRecorder, + handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength), + resyncPeriod: resyncPeriod, + contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"), } volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -149,62 +145,77 @@ func (ctrl *csiSnapshotSideCarController) enqueueContentWork(obj interface{}) { // contentWorker processes items from contentQueue. It must run only once, // syncContent is not assured to be reentrant. func (ctrl *csiSnapshotSideCarController) contentWorker() { - workFunc := func() bool { - keyObj, quit := ctrl.contentQueue.Get() - if quit { - return true - } - defer ctrl.contentQueue.Done(keyObj) - key := keyObj.(string) - klog.V(5).Infof("contentWorker[%s]", key) + for ctrl.processNextItem() { + } +} - _, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err) - return false - } - content, err := ctrl.contentLister.Get(name) - // The content still exists in informer cache, the event must have - // been add/update/sync - if err == nil { - if ctrl.isDriverMatch(content) { - ctrl.updateContentInCacheStore(content) - } - return false - } - if !errors.IsNotFound(err) { - klog.V(2).Infof("error getting content %q from informer: %v", key, err) - return false - } - - // The content is not in informer cache, the event must have been - // "delete" - contentObj, found, err := ctrl.contentStore.GetByKey(key) - if err != nil { - klog.V(2).Infof("error getting content %q from cache: %v", key, err) - return false - } - if !found { - // The controller has already processed the delete event and - // deleted the content from its cache - klog.V(2).Infof("deletion of content %q was already processed", key) - return false - } - content, ok := contentObj.(*crdv1.VolumeSnapshotContent) - if !ok { - klog.Errorf("expected content, got %+v", content) - return false - } - ctrl.deleteContentInCacheStore(content) +func (ctrl *csiSnapshotSideCarController) processNextItem() bool { + keyObj, quit := ctrl.contentQueue.Get() + if quit { return false } + defer ctrl.contentQueue.Done(keyObj) - for { - if quit := workFunc(); quit { - klog.Infof("content worker queue shutting down") - return - } + if err := ctrl.syncContentByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + ctrl.contentQueue.AddRateLimited(keyObj) + klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err) + return true } + + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + ctrl.contentQueue.Forget(keyObj) + return true +} + +func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error { + klog.V(5).Infof("syncContentByKey[%s]", key) + + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err) + return nil + } + content, err := ctrl.contentLister.Get(name) + // The content still exists in informer cache, the event must have + // been add/update/sync + if err == nil { + if ctrl.isDriverMatch(content) { + err = ctrl.updateContentInInformerCache(content) + } + if err != nil { + // If error occurs we add this item back to the queue + return err + } + return nil + } + if !errors.IsNotFound(err) { + klog.V(2).Infof("error getting content %q from informer: %v", key, err) + return nil + } + + // The content is not in informer cache, the event must have been + // "delete" + contentObj, found, err := ctrl.contentStore.GetByKey(key) + if err != nil { + klog.V(2).Infof("error getting content %q from cache: %v", key, err) + return nil + } + if !found { + // The controller has already processed the delete event and + // deleted the content from its cache + klog.V(2).Infof("deletion of content %q was already processed", key) + return nil + } + content, ok := contentObj.(*crdv1.VolumeSnapshotContent) + if !ok { + klog.Errorf("expected content, got %+v", content) + return nil + } + ctrl.deleteContentInCacheStore(content) + return nil } // verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name @@ -228,9 +239,9 @@ func (ctrl *csiSnapshotSideCarController) isDriverMatch(content *crdv1.VolumeSna return true } -// updateContent runs in worker thread and handles "content added", +// updateContentInInformerCache runs in worker thread and handles "content added", // "content updated" and "periodic sync" events. -func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crdv1.VolumeSnapshotContent) { +func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) error { // Store the new content version in the cache and do not process it if this is // an old version. new, err := ctrl.storeContentUpdate(content) @@ -238,7 +249,7 @@ func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crd klog.Errorf("%v", err) } if !new { - return + return nil } err = ctrl.syncContent(content) if err != nil { @@ -249,7 +260,9 @@ func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crd } else { klog.Errorf("could not sync content %q: %+v", content.Name, err) } + return err } + return nil } // deleteContent runs in worker thread and handles "content deleted" event. diff --git a/pkg/sidecar-controller/snapshot_delete_test.go b/pkg/sidecar-controller/snapshot_delete_test.go index d5274d06..dd73ed4f 100644 --- a/pkg/sidecar-controller/snapshot_delete_test.go +++ b/pkg/sidecar-controller/snapshot_delete_test.go @@ -172,6 +172,7 @@ func TestDeleteSync(t *testing.T) { }, expectedListCalls: []listCall{{"sid1-1", map[string]string{}, true, time.Now(), 1, nil}}, expectedDeleteCalls: []deleteCall{{"sid1-1", nil, nil}}, + expectSuccess: true, test: testSyncContent, }, { @@ -194,6 +195,7 @@ func TestDeleteSync(t *testing.T) { }, expectedListCalls: []listCall{{"sid1-2", map[string]string{}, true, time.Now(), 1, nil}}, expectedDeleteCalls: []deleteCall{{"sid1-2", nil, nil}}, + expectSuccess: true, test: testSyncContent, }, { @@ -223,8 +225,12 @@ func TestDeleteSync(t *testing.T) { initialContents: newContentArrayWithDeletionTimestamp("content1-1", "snapuid1-1", "snap1-1", "sid1-1", "invalid", "", "snap1-4-volumehandle", deletionPolicy, nil, nil, true, &timeNowMetav1), expectedContents: newContentArrayWithDeletionTimestamp("content1-1", "snapuid1-1", "snap1-1", "sid1-1", "invalid", "", "snap1-4-volumehandle", deletionPolicy, nil, nil, true, &timeNowMetav1), expectedEvents: noevents, - errors: noerrors, - test: testSyncContent, + errors: []reactorError{ + // Inject error to the first client.VolumesnapshotV1beta1().VolumeSnapshotContents().Delete call. + // All other calls will succeed. + {"get", "secrets", errors.New("mock get invalid secret error")}, + }, + test: testSyncContent, }, { name: "1-5 - csi driver delete snapshot returns error, bound finalizer should remain",