From 5800df6a61d3432ce59f8396dbf03e42a8516792 Mon Sep 17 00:00:00 2001 From: xing-yang Date: Fri, 22 May 2020 02:40:24 +0000 Subject: [PATCH] Fix requeue logic in the common controller --- pkg/common-controller/framework_test.go | 6 - pkg/common-controller/snapshot_controller.go | 70 ++--- .../snapshot_controller_base.go | 289 +++++++++--------- pkg/common-controller/snapshot_create_test.go | 38 +-- .../kubernetes/pkg/util/goroutinemap/BUILD | 41 --- .../kubernetes/pkg/util/goroutinemap/OWNERS | 6 - .../goroutinemap/exponentialbackoff/BUILD | 25 -- .../exponentialbackoff/exponential_backoff.go | 121 -------- .../pkg/util/goroutinemap/goroutinemap.go | 230 -------------- vendor/modules.txt | 2 - 10 files changed, 166 insertions(+), 662 deletions(-) delete mode 100644 vendor/k8s.io/kubernetes/pkg/util/goroutinemap/BUILD delete mode 100644 vendor/k8s.io/kubernetes/pkg/util/goroutinemap/OWNERS delete mode 100644 vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/BUILD delete mode 100644 vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go delete mode 100644 vendor/k8s.io/kubernetes/pkg/util/goroutinemap/goroutinemap.go diff --git a/pkg/common-controller/framework_test.go b/pkg/common-controller/framework_test.go index d1df11aa..4a596df2 100644 --- a/pkg/common-controller/framework_test.go +++ b/pkg/common-controller/framework_test.go @@ -582,7 +582,6 @@ func (r *snapshotReactor) getChangeCount() int { // waitForIdle waits until all tests, controllers and other goroutines do their // job and no new actions are registered for 10 milliseconds. func (r *snapshotReactor) waitForIdle() { - r.ctrl.runningOperations.WaitForCompletion() // Check every 10ms if the controller does something and stop if it's // idle. oldChanges := -1 @@ -609,9 +608,6 @@ func (r *snapshotReactor) waitTest(test controllerTest) error { Steps: 10, } err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { - // Finish all operations that are in progress - r.ctrl.runningOperations.WaitForCompletion() - // Return 'true' if the reactor reached the expected state err1 := r.checkSnapshots(test.expectedSnapshots) err2 := r.checkContents(test.expectedContents) @@ -757,8 +753,6 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte ctrl.snapshotListerSynced = alwaysReady ctrl.classListerSynced = alwaysReady ctrl.pvcListerSynced = alwaysReady - ctrl.createSnapshotContentInterval = time.Millisecond * 5 - ctrl.createSnapshotContentRetryCount = 3 return ctrl, nil } diff --git a/pkg/common-controller/snapshot_controller.go b/pkg/common-controller/snapshot_controller.go index 46ca0793..8b9046d0 100644 --- a/pkg/common-controller/snapshot_controller.go +++ b/pkg/common-controller/snapshot_controller.go @@ -423,18 +423,10 @@ func (ctrl *csiSnapshotCommonController) syncUnreadySnapshot(snapshot *crdv1.Vol } // update snapshot status - for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ { - klog.V(5).Infof("syncUnreadySnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshot)) - _, err = ctrl.updateSnapshotStatus(snapshot, newContent) - if err == nil { - break - } - klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err) - time.Sleep(ctrl.createSnapshotContentInterval) - } - - if err != nil { + klog.V(5).Infof("syncUnreadySnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshot)) + if _, err = ctrl.updateSnapshotStatus(snapshot, newContent); err != nil { // update snapshot status failed + klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err) ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotStatusUpdateFailed", fmt.Sprintf("Snapshot status update failed, %v", err)) return err } @@ -474,17 +466,8 @@ func (ctrl *csiSnapshotCommonController) syncUnreadySnapshot(snapshot *crdv1.Vol } // Update snapshot status with BoundVolumeSnapshotContentName - for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ { - klog.V(5).Infof("syncUnreadySnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshot)) - _, err = ctrl.updateSnapshotStatus(snapshot, content) - if err == nil { - break - } - klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err) - time.Sleep(ctrl.createSnapshotContentInterval) - } - - if err != nil { + klog.V(5).Infof("syncUnreadySnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshot)) + if _, err = ctrl.updateSnapshotStatus(snapshot, content); err != nil { // update snapshot status failed ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotStatusUpdateFailed", fmt.Sprintf("Snapshot status update failed, %v", err)) return err @@ -656,24 +639,18 @@ func (ctrl *csiSnapshotCommonController) createSnapshotContent(snapshot *crdv1.V } var updateContent *crdv1.VolumeSnapshotContent - klog.V(3).Infof("volume snapshot content %#v", snapshotContent) - // Try to create the VolumeSnapshotContent object several times - for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ { - klog.V(5).Infof("createSnapshotContent [%s]: trying to save volume snapshot content %s", utils.SnapshotKey(snapshot), snapshotContent.Name) - if updateContent, err = ctrl.clientset.SnapshotV1beta1().VolumeSnapshotContents().Create(context.TODO(), snapshotContent, metav1.CreateOptions{}); err == nil || apierrs.IsAlreadyExists(err) { - // Save succeeded. - if err != nil { - klog.V(3).Infof("volume snapshot content %q for snapshot %q already exists, reusing", snapshotContent.Name, utils.SnapshotKey(snapshot)) - err = nil - updateContent = snapshotContent - } else { - klog.V(3).Infof("volume snapshot content %q for snapshot %q saved, %v", snapshotContent.Name, utils.SnapshotKey(snapshot), snapshotContent) - } - break + klog.V(5).Infof("volume snapshot content %#v", snapshotContent) + // Try to create the VolumeSnapshotContent object + klog.V(5).Infof("createSnapshotContent [%s]: trying to save volume snapshot content %s", utils.SnapshotKey(snapshot), snapshotContent.Name) + if updateContent, err = ctrl.clientset.SnapshotV1beta1().VolumeSnapshotContents().Create(context.TODO(), snapshotContent, metav1.CreateOptions{}); err == nil || apierrs.IsAlreadyExists(err) { + // Save succeeded. + if err != nil { + klog.V(3).Infof("volume snapshot content %q for snapshot %q already exists, reusing", snapshotContent.Name, utils.SnapshotKey(snapshot)) + err = nil + updateContent = snapshotContent + } else { + klog.V(3).Infof("volume snapshot content %q for snapshot %q saved, %v", snapshotContent.Name, utils.SnapshotKey(snapshot), snapshotContent) } - // Save failed, try again after a while. - klog.V(3).Infof("failed to save volume snapshot content %q for snapshot %q: %v", snapshotContent.Name, utils.SnapshotKey(snapshot), err) - time.Sleep(ctrl.createSnapshotContentInterval) } if err != nil { @@ -982,19 +959,14 @@ func (ctrl *csiSnapshotCommonController) bindandUpdateVolumeSnapshot(snapshotCon snapshotCopy := snapshotObj.DeepCopy() // update snapshot status var updateSnapshot *crdv1.VolumeSnapshot - for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ { - klog.V(5).Infof("bindandUpdateVolumeSnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshotCopy)) - updateSnapshot, err = ctrl.updateSnapshotStatus(snapshotCopy, snapshotContent) - if err == nil { - snapshotCopy = updateSnapshot - break - } - klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err) - time.Sleep(ctrl.createSnapshotContentInterval) + klog.V(5).Infof("bindandUpdateVolumeSnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshotCopy)) + updateSnapshot, err = ctrl.updateSnapshotStatus(snapshotCopy, snapshotContent) + if err == nil { + snapshotCopy = updateSnapshot } - if err != nil { // update snapshot status failed + klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err) ctrl.updateSnapshotErrorStatusWithEvent(snapshotCopy, v1.EventTypeWarning, "SnapshotStatusUpdateFailed", fmt.Sprintf("Snapshot status update failed, %v", err)) return nil, err } diff --git a/pkg/common-controller/snapshot_controller_base.go b/pkg/common-controller/snapshot_controller_base.go index 01d6c596..d69a613e 100644 --- a/pkg/common-controller/snapshot_controller_base.go +++ b/pkg/common-controller/snapshot_controller_base.go @@ -39,15 +39,8 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/util/goroutinemap" ) -// Number of retries when we create a VolumeSnapshotContent object -const createSnapshotContentRetryCount = 5 - -// Interval between retries when we create a VolumeSnapshotContent object -const createSnapshotContentInterval = 10 * time.Second - type csiSnapshotCommonController struct { clientset clientset.Interface client kubernetes.Interface @@ -67,12 +60,7 @@ type csiSnapshotCommonController struct { snapshotStore cache.Store contentStore cache.Store - // Map of scheduled/running operations. - runningOperations goroutinemap.GoRoutineMap - - createSnapshotContentRetryCount int - createSnapshotContentInterval time.Duration - resyncPeriod time.Duration + resyncPeriod time.Duration } // NewCSISnapshotController returns a new *csiSnapshotCommonController @@ -92,17 +80,14 @@ func NewCSISnapshotCommonController( eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("snapshot-controller")}) ctrl := &csiSnapshotCommonController{ - clientset: clientset, - client: client, - eventRecorder: eventRecorder, - runningOperations: goroutinemap.NewGoRoutineMap(true), - createSnapshotContentRetryCount: createSnapshotContentRetryCount, - createSnapshotContentInterval: createSnapshotContentInterval, - resyncPeriod: resyncPeriod, - snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-snapshot"), - contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-content"), + clientset: clientset, + client: client, + eventRecorder: eventRecorder, + resyncPeriod: resyncPeriod, + snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-snapshot"), + contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-content"), } ctrl.pvcLister = pvcInformer.Lister() @@ -192,134 +177,142 @@ func (ctrl *csiSnapshotCommonController) enqueueContentWork(obj interface{}) { } } -// snapshotWorker processes items from snapshotQueue. It must run only once, -// syncSnapshot is not assured to be reentrant. +// snapshotWorker is the main worker for VolumeSnapshots. func (ctrl *csiSnapshotCommonController) snapshotWorker() { - workFunc := func() bool { - keyObj, quit := ctrl.snapshotQueue.Get() - if quit { - return true - } - defer ctrl.snapshotQueue.Done(keyObj) - key := keyObj.(string) - klog.V(5).Infof("snapshotWorker[%s]", key) - - namespace, name, err := cache.SplitMetaNamespaceKey(key) - klog.V(5).Infof("snapshotWorker: snapshot namespace [%s] name [%s]", namespace, name) - if err != nil { - klog.Errorf("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err) - return false - } - snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name) - if err == nil { - // The volume snapshot still exists in informer cache, the event must have - // been add/update/sync - newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot) - if err == nil || (newSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err)) { - // If the VolumeSnapshotClass is not found, we still need to process an update - // so that syncSnapshot can delete the snapshot, should it still exist in the - // cluster after it's been removed from the informer cache - klog.V(5).Infof("updating snapshot %q; snapshotClass may have already been removed", key) - ctrl.updateSnapshot(newSnapshot) - } - return false - } - if err != nil && !errors.IsNotFound(err) { - klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err) - return false - } - // The snapshot is not in informer cache, the event must have been "delete" - vsObj, found, err := ctrl.snapshotStore.GetByKey(key) - if err != nil { - klog.V(2).Infof("error getting snapshot %q from cache: %v", key, err) - return false - } - if !found { - // The controller has already processed the delete event and - // deleted the snapshot from its cache - klog.V(2).Infof("deletion of snapshot %q was already processed", key) - return false - } - snapshot, ok := vsObj.(*crdv1.VolumeSnapshot) - if !ok { - klog.Errorf("expected vs, got %+v", vsObj) - return false - } - newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot) - if err == nil || errors.IsNotFound(err) { - // We should still handle deletion events even if the VolumeSnapshotClass - // is not found in the cluster - klog.V(5).Infof("deleting snapshot %q; snapshotClass may have already been removed", key) - ctrl.deleteSnapshot(newSnapshot) - } - return false + keyObj, quit := ctrl.snapshotQueue.Get() + if quit { + return } + defer ctrl.snapshotQueue.Done(keyObj) - for { - if quit := workFunc(); quit { - klog.Infof("snapshot worker queue shutting down") - return - } + if err := ctrl.syncSnapshotByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + ctrl.snapshotQueue.AddRateLimited(keyObj) + klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", keyObj.(string), err) + } else { + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + ctrl.snapshotQueue.Forget(keyObj) } } -// contentWorker processes items from contentQueue. It must run only once, -// syncContent is not assured to be reentrant. -func (ctrl *csiSnapshotCommonController) contentWorker() { - workFunc := func() bool { - keyObj, quit := ctrl.contentQueue.Get() - if quit { - return true - } - defer ctrl.contentQueue.Done(keyObj) - key := keyObj.(string) - klog.V(5).Infof("contentWorker[%s]", key) +// syncSnapshotByKey processes a VolumeSnapshot request. +func (ctrl *csiSnapshotCommonController) syncSnapshotByKey(key string) error { + klog.V(5).Infof("syncSnapshotByKey[%s]", key) - _, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err) - return false - } - content, err := ctrl.contentLister.Get(name) - // The content still exists in informer cache, the event must have + namespace, name, err := cache.SplitMetaNamespaceKey(key) + klog.V(5).Infof("snapshotWorker: snapshot namespace [%s] name [%s]", namespace, name) + if err != nil { + klog.Errorf("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err) + return nil + } + snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name) + if err == nil { + // The volume snapshot still exists in informer cache, the event must have // been add/update/sync - if err == nil { - ctrl.updateContent(content) - return false + newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot) + if err == nil || (newSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err)) { + // If the VolumeSnapshotClass is not found, we still need to process an update + // so that syncSnapshot can delete the snapshot, should it still exist in the + // cluster after it's been removed from the informer cache + if newSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err) { + klog.V(5).Infof("Snapshot %q is being deleted. SnapshotClass has already been removed", key) + } + klog.V(5).Infof("Updating snapshot %q", key) + return ctrl.updateSnapshot(newSnapshot) } - if !errors.IsNotFound(err) { - klog.V(2).Infof("error getting content %q from informer: %v", key, err) - return false - } - - // The content is not in informer cache, the event must have been - // "delete" - contentObj, found, err := ctrl.contentStore.GetByKey(key) - if err != nil { - klog.V(2).Infof("error getting content %q from cache: %v", key, err) - return false - } - if !found { - // The controller has already processed the delete event and - // deleted the content from its cache - klog.V(2).Infof("deletion of content %q was already processed", key) - return false - } - content, ok := contentObj.(*crdv1.VolumeSnapshotContent) - if !ok { - klog.Errorf("expected content, got %+v", content) - return false - } - ctrl.deleteContent(content) - return false + return err + } + if err != nil && !errors.IsNotFound(err) { + klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err) + return err + } + // The snapshot is not in informer cache, the event must have been "delete" + vsObj, found, err := ctrl.snapshotStore.GetByKey(key) + if err != nil { + klog.V(2).Infof("error getting snapshot %q from cache: %v", key, err) + return nil + } + if !found { + // The controller has already processed the delete event and + // deleted the snapshot from its cache + klog.V(2).Infof("deletion of snapshot %q was already processed", key) + return nil + } + snapshot, ok := vsObj.(*crdv1.VolumeSnapshot) + if !ok { + klog.Errorf("expected vs, got %+v", vsObj) + return nil } - for { - if quit := workFunc(); quit { - klog.Infof("content worker queue shutting down") - return - } + klog.V(5).Infof("deleting snapshot %q", key) + ctrl.deleteSnapshot(snapshot) + + return nil +} + +// contentWorker is the main worker for VolumeSnapshotContent. +func (ctrl *csiSnapshotCommonController) contentWorker() { + keyObj, quit := ctrl.contentQueue.Get() + if quit { + return } + defer ctrl.contentQueue.Done(keyObj) + + if err := ctrl.syncContentByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + ctrl.contentQueue.AddRateLimited(keyObj) + klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err) + } else { + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + ctrl.contentQueue.Forget(keyObj) + } +} + +// syncContentByKey processes a VolumeSnapshotContent request. +func (ctrl *csiSnapshotCommonController) syncContentByKey(key string) error { + klog.V(5).Infof("syncContentByKey[%s]", key) + + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err) + return nil + } + content, err := ctrl.contentLister.Get(name) + // The content still exists in informer cache, the event must have + // been add/update/sync + if err == nil { + // If error occurs we add this item back to the queue + return ctrl.updateContent(content) + } + if !errors.IsNotFound(err) { + klog.V(2).Infof("error getting content %q from informer: %v", key, err) + return nil + } + + // The content is not in informer cache, the event must have been + // "delete" + contentObj, found, err := ctrl.contentStore.GetByKey(key) + if err != nil { + klog.V(2).Infof("error getting content %q from cache: %v", key, err) + return nil + } + if !found { + // The controller has already processed the delete event and + // deleted the content from its cache + klog.V(2).Infof("deletion of content %q was already processed", key) + return nil + } + content, ok := contentObj.(*crdv1.VolumeSnapshotContent) + if !ok { + klog.Errorf("expected content, got %+v", content) + return nil + } + ctrl.deleteContent(content) + return nil } // checkAndUpdateSnapshotClass gets the VolumeSnapshotClass from VolumeSnapshot. If it is not set, @@ -357,7 +350,7 @@ func (ctrl *csiSnapshotCommonController) checkAndUpdateSnapshotClass(snapshot *c // updateSnapshot runs in worker thread and handles "snapshot added", // "snapshot updated" and "periodic sync" events. -func (ctrl *csiSnapshotCommonController) updateSnapshot(snapshot *crdv1.VolumeSnapshot) { +func (ctrl *csiSnapshotCommonController) updateSnapshot(snapshot *crdv1.VolumeSnapshot) error { // Store the new snapshot version in the cache and do not process it if this is // an old version. klog.V(5).Infof("updateSnapshot %q", utils.SnapshotKey(snapshot)) @@ -366,23 +359,25 @@ func (ctrl *csiSnapshotCommonController) updateSnapshot(snapshot *crdv1.VolumeSn klog.Errorf("%v", err) } if !newSnapshot { - return + return nil } err = ctrl.syncSnapshot(snapshot) if err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller // recovers from it easily. - klog.V(3).Infof("could not sync claim %q: %+v", utils.SnapshotKey(snapshot), err) + klog.V(3).Infof("could not sync snapshot %q: %+v", utils.SnapshotKey(snapshot), err) } else { - klog.Errorf("could not sync volume %q: %+v", utils.SnapshotKey(snapshot), err) + klog.Errorf("could not sync snapshot %q: %+v", utils.SnapshotKey(snapshot), err) } + return err } + return nil } // updateContent runs in worker thread and handles "content added", // "content updated" and "periodic sync" events. -func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnapshotContent) { +func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnapshotContent) error { // Store the new content version in the cache and do not process it if this is // an old version. new, err := ctrl.storeContentUpdate(content) @@ -390,7 +385,7 @@ func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnap klog.Errorf("%v", err) } if !new { - return + return nil } err = ctrl.syncContent(content) if err != nil { @@ -401,7 +396,9 @@ func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnap } else { klog.Errorf("could not sync content %q: %+v", content.Name, err) } + return err } + return nil } // deleteSnapshot runs in worker thread and handles "snapshot deleted" event. diff --git a/pkg/common-controller/snapshot_create_test.go b/pkg/common-controller/snapshot_create_test.go index bc7bce0d..95ea75d9 100644 --- a/pkg/common-controller/snapshot_create_test.go +++ b/pkg/common-controller/snapshot_create_test.go @@ -107,24 +107,6 @@ func TestCreateSnapshotSync(t *testing.T) { expectSuccess: false, test: testSyncSnapshot, }, - - { - name: "7-2 - fail to update snapshot reports warning event", - initialContents: newContentArrayWithReadyToUse("snapcontent-snapuid7-2", "snapuid7-2", "snap7-2", "sid7-2", classGold, "", "pv-handle7-2", deletionPolicy, nil, nil, &True, false), - expectedContents: newContentArrayWithReadyToUse("snapcontent-snapuid7-2", "snapuid7-2", "snap7-2", "sid7-2", classGold, "", "pv-handle7-2", deletionPolicy, nil, nil, &True, false), - initialSnapshots: newSnapshotArray("snap7-2", "snapuid7-2", "claim7-2", "", classGold, "snapcontent-snapuid7-2", &False, nil, nil, nil, false, true, nil), - expectedSnapshots: newSnapshotArray("snap7-2", "snapuid7-2", "claim7-2", "", classGold, "snapcontent-snapuid7-2", &False, nil, nil, newVolumeError("Snapshot status update failed, snapshot controller failed to update default/snap7-2 on API server: mock update error"), false, true, nil), - initialClaims: newClaimArray("claim7-2", "pvc-uid7-2", "1Gi", "volume7-2", v1.ClaimBound, &classGold), - initialVolumes: newVolumeArray("volume7-2", "pv-uid7-2", "pv-handle7-2", "1Gi", "pvc-uid7-2", "claim7-2", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classGold), - expectedEvents: []string{"Warning SnapshotStatusUpdateFailed"}, - errors: []reactorError{ - // Inject error to the forth client.VolumesnapshotV1beta1().VolumeSnapshots().Update call. - // All other calls will succeed. - {"update", "volumesnapshots", errors.New("mock update error")}, - {"update", "volumesnapshots", errors.New("mock update error")}, - {"update", "volumesnapshots", errors.New("mock update error")}, - }, test: testSyncSnapshot, - }, { name: "7-3 - fail to create snapshot without snapshot class ", initialContents: nocontents, @@ -193,23 +175,6 @@ func TestCreateSnapshotSync(t *testing.T) { expectSuccess: false, test: testSyncSnapshot, }, - { - name: "7-8 - fail create snapshot due to cannot update snapshot status", - initialContents: nocontents, - expectedContents: newContentArrayNoStatus("snapcontent-snapuid7-8", "snapuid7-8", "snap7-8", "sid7-8", classGold, "", "pv-handle7-8", deletionPolicy, nil, nil, false, false), - initialSnapshots: newSnapshotArray("snap7-8", "snapuid7-8", "claim7-8", "", classGold, "", &False, nil, nil, nil, false, true, nil), - expectedSnapshots: newSnapshotArray("snap7-8", "snapuid7-8", "claim7-8", "", classGold, "", &False, nil, nil, newVolumeError("Snapshot status update failed, snapshot controller failed to update default/snap7-8 on API server: mock update error"), false, true, nil), - initialClaims: newClaimArray("claim7-8", "pvc-uid7-8", "1Gi", "volume7-8", v1.ClaimBound, &classEmpty), - initialVolumes: newVolumeArray("volume7-8", "pv-uid7-8", "pv-handle7-8", "1Gi", "pvc-uid7-8", "claim7-8", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty), - errors: []reactorError{ - {"update", "volumesnapshots", errors.New("mock update error")}, - {"update", "volumesnapshots", errors.New("mock update error")}, - {"update", "volumesnapshots", errors.New("mock update error")}, - }, - expectedEvents: []string{"Normal CreatingSnapshot"}, - expectSuccess: false, - test: testSyncSnapshot, - }, { name: "7-9 - fail create snapshot due to cannot update snapshot status, and failure cannot be recorded either due to additional status update failure.", initialContents: nocontents, @@ -239,7 +204,8 @@ func TestCreateSnapshotSync(t *testing.T) { test: testSyncSnapshot, }, { - // TODO(xiangqian): this test case needs to be revisited the scenario + // TODO(xiangqian): this test case needs to be + // revisited the scenario // of VolumeSnapshotContent saving failure. Since there will be no content object // in API server, it could potentially cause leaking issue name: "7-11 - fail create snapshot due to cannot save snapshot content", diff --git a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/BUILD b/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/BUILD deleted file mode 100644 index 92688fde..00000000 --- a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/BUILD +++ /dev/null @@ -1,41 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = ["goroutinemap.go"], - importpath = "k8s.io/kubernetes/pkg/util/goroutinemap", - deps = [ - "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/klog:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["goroutinemap_test.go"], - embed = [":go_default_library"], - deps = ["//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library"], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [ - ":package-srcs", - "//pkg/util/goroutinemap/exponentialbackoff:all-srcs", - ], - tags = ["automanaged"], -) diff --git a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/OWNERS b/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/OWNERS deleted file mode 100644 index 1bbf5cf2..00000000 --- a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/OWNERS +++ /dev/null @@ -1,6 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -approvers: -- saad-ali -reviewers: -- saad-ali diff --git a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/BUILD b/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/BUILD deleted file mode 100644 index 9f78c287..00000000 --- a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/BUILD +++ /dev/null @@ -1,25 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) - -go_library( - name = "go_default_library", - srcs = ["exponential_backoff.go"], - importpath = "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff", -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go b/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go deleted file mode 100644 index 8cd00a4d..00000000 --- a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff/exponential_backoff.go +++ /dev/null @@ -1,121 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package exponentialbackoff contains logic for implementing exponential -// backoff for GoRoutineMap and NestedPendingOperations. -package exponentialbackoff - -import ( - "fmt" - "time" -) - -const ( - // initialDurationBeforeRetry is the amount of time after an error occurs - // that GoroutineMap will refuse to allow another operation to start with - // the same target (if exponentialBackOffOnError is enabled). Each - // successive error results in a wait 2x times the previous. - initialDurationBeforeRetry time.Duration = 500 * time.Millisecond - - // maxDurationBeforeRetry is the maximum amount of time that - // durationBeforeRetry will grow to due to exponential backoff. - // Value is slightly offset from 2 minutes to make timeouts due to this - // constant recognizable. - maxDurationBeforeRetry time.Duration = 2*time.Minute + 2*time.Second -) - -// ExponentialBackoff contains the last occurrence of an error and the duration -// that retries are not permitted. -type ExponentialBackoff struct { - lastError error - lastErrorTime time.Time - durationBeforeRetry time.Duration -} - -// SafeToRetry returns an error if the durationBeforeRetry period for the given -// lastErrorTime has not yet expired. Otherwise it returns nil. -func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error { - if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry { - return NewExponentialBackoffError(operationName, *expBackoff) - } - - return nil -} - -func (expBackoff *ExponentialBackoff) Update(err *error) { - if expBackoff.durationBeforeRetry == 0 { - expBackoff.durationBeforeRetry = initialDurationBeforeRetry - } else { - expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry - if expBackoff.durationBeforeRetry > maxDurationBeforeRetry { - expBackoff.durationBeforeRetry = maxDurationBeforeRetry - } - } - - expBackoff.lastError = *err - expBackoff.lastErrorTime = time.Now() -} - -func (expBackoff *ExponentialBackoff) GenerateNoRetriesPermittedMsg(operationName string) string { - return fmt.Sprintf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). Error: %q", - operationName, - expBackoff.lastErrorTime.Add(expBackoff.durationBeforeRetry), - expBackoff.durationBeforeRetry, - expBackoff.lastError) -} - -// NewExponentialBackoffError returns a new instance of ExponentialBackoff error. -func NewExponentialBackoffError( - operationName string, expBackoff ExponentialBackoff) error { - return exponentialBackoffError{ - operationName: operationName, - expBackoff: expBackoff, - } -} - -// IsExponentialBackoff returns true if an error returned from GoroutineMap -// indicates that a new operation can not be started because -// exponentialBackOffOnError is enabled and a previous operation with the same -// operation failed within the durationBeforeRetry period. -func IsExponentialBackoff(err error) bool { - switch err.(type) { - case exponentialBackoffError: - return true - default: - return false - } -} - -// exponentialBackoffError is the error returned returned from GoroutineMap when -// a new operation can not be started because exponentialBackOffOnError is -// enabled and a previous operation with the same operation failed within the -// durationBeforeRetry period. -type exponentialBackoffError struct { - operationName string - expBackoff ExponentialBackoff -} - -var _ error = exponentialBackoffError{} - -func (err exponentialBackoffError) Error() string { - return fmt.Sprintf( - "Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.", - err.operationName, - err.expBackoff.lastErrorTime, - err.expBackoff.lastErrorTime.Add(err.expBackoff.durationBeforeRetry), - err.expBackoff.durationBeforeRetry, - err.expBackoff.lastError) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/goroutinemap.go b/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/goroutinemap.go deleted file mode 100644 index 9b6eb731..00000000 --- a/vendor/k8s.io/kubernetes/pkg/util/goroutinemap/goroutinemap.go +++ /dev/null @@ -1,230 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -Package goroutinemap implements a data structure for managing go routines -by name. It prevents the creation of new go routines if an existing go routine -with the same name exists. -*/ -package goroutinemap - -import ( - "fmt" - "sync" - - k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/klog" - "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" -) - -// GoRoutineMap defines a type that can run named goroutines and track their -// state. It prevents the creation of multiple goroutines with the same name -// and may prevent recreation of a goroutine until after the a backoff time -// has elapsed after the last goroutine with that name finished. -type GoRoutineMap interface { - // Run adds operation name to the list of running operations and spawns a - // new go routine to execute the operation. - // If an operation with the same operation name already exists, an - // AlreadyExists or ExponentialBackoff error is returned. - // Once the operation is complete, the go routine is terminated and the - // operation name is removed from the list of executing operations allowing - // a new operation to be started with the same operation name without error. - Run(operationName string, operationFunc func() error) error - - // Wait blocks until operations map is empty. This is typically - // necessary during tests - the test should wait until all operations finish - // and evaluate results after that. - Wait() - - // WaitForCompletion blocks until either all operations have successfully completed - // or have failed but are not pending. The test should wait until operations are either - // complete or have failed. - WaitForCompletion() - - // IsOperationPending returns true if the operation is pending (currently - // running), otherwise returns false. - IsOperationPending(operationName string) bool -} - -// NewGoRoutineMap returns a new instance of GoRoutineMap. -func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap { - g := &goRoutineMap{ - operations: make(map[string]operation), - exponentialBackOffOnError: exponentialBackOffOnError, - } - - g.cond = sync.NewCond(&g.lock) - return g -} - -type goRoutineMap struct { - operations map[string]operation - exponentialBackOffOnError bool - cond *sync.Cond - lock sync.RWMutex -} - -// operation holds the state of a single goroutine. -type operation struct { - operationPending bool - expBackoff exponentialbackoff.ExponentialBackoff -} - -func (grm *goRoutineMap) Run( - operationName string, - operationFunc func() error) error { - grm.lock.Lock() - defer grm.lock.Unlock() - - existingOp, exists := grm.operations[operationName] - if exists { - // Operation with name exists - if existingOp.operationPending { - return NewAlreadyExistsError(operationName) - } - - if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil { - return err - } - } - - grm.operations[operationName] = operation{ - operationPending: true, - expBackoff: existingOp.expBackoff, - } - go func() (err error) { - // Handle unhandled panics (very unlikely) - defer k8sRuntime.HandleCrash() - // Handle completion of and error, if any, from operationFunc() - defer grm.operationComplete(operationName, &err) - // Handle panic, if any, from operationFunc() - defer k8sRuntime.RecoverFromPanic(&err) - return operationFunc() - }() - - return nil -} - -// operationComplete handles the completion of a goroutine run in the -// goRoutineMap. -func (grm *goRoutineMap) operationComplete( - operationName string, err *error) { - // Defer operations are executed in Last-In is First-Out order. In this case - // the lock is acquired first when operationCompletes begins, and is - // released when the method finishes, after the lock is released cond is - // signaled to wake waiting goroutine. - defer grm.cond.Signal() - grm.lock.Lock() - defer grm.lock.Unlock() - - if *err == nil || !grm.exponentialBackOffOnError { - // Operation completed without error, or exponentialBackOffOnError disabled - delete(grm.operations, operationName) - if *err != nil { - // Log error - klog.Errorf("operation for %q failed with: %v", - operationName, - *err) - } - } else { - // Operation completed with error and exponentialBackOffOnError Enabled - existingOp := grm.operations[operationName] - existingOp.expBackoff.Update(err) - existingOp.operationPending = false - grm.operations[operationName] = existingOp - - // Log error - klog.Errorf("%v", - existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName)) - } -} - -func (grm *goRoutineMap) IsOperationPending(operationName string) bool { - grm.lock.RLock() - defer grm.lock.RUnlock() - existingOp, exists := grm.operations[operationName] - if exists && existingOp.operationPending { - return true - } - return false -} - -func (grm *goRoutineMap) Wait() { - grm.lock.Lock() - defer grm.lock.Unlock() - - for len(grm.operations) > 0 { - grm.cond.Wait() - } -} - -func (grm *goRoutineMap) WaitForCompletion() { - grm.lock.Lock() - defer grm.lock.Unlock() - - for { - if len(grm.operations) == 0 || grm.nothingPending() { - break - } else { - grm.cond.Wait() - } - } -} - -// Check if any operation is pending. Already assumes caller has the -// necessary locks -func (grm *goRoutineMap) nothingPending() bool { - nothingIsPending := true - for _, operation := range grm.operations { - if operation.operationPending { - nothingIsPending = false - break - } - } - return nothingIsPending -} - -// NewAlreadyExistsError returns a new instance of AlreadyExists error. -func NewAlreadyExistsError(operationName string) error { - return alreadyExistsError{operationName} -} - -// IsAlreadyExists returns true if an error returned from GoRoutineMap indicates -// a new operation can not be started because an operation with the same -// operation name is already executing. -func IsAlreadyExists(err error) bool { - switch err.(type) { - case alreadyExistsError: - return true - default: - return false - } -} - -// alreadyExistsError is the error returned by GoRoutineMap when a new operation -// can not be started because an operation with the same operation name is -// already executing. -type alreadyExistsError struct { - operationName string -} - -var _ error = alreadyExistsError{} - -func (err alreadyExistsError) Error() string { - return fmt.Sprintf( - "Failed to create operation with name %q. An operation with that name is already executing.", - err.operationName) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 76be8816..4426cac8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -535,8 +535,6 @@ k8s.io/kube-openapi/pkg/generators/rules k8s.io/kube-openapi/pkg/util/proto k8s.io/kube-openapi/pkg/util/sets # k8s.io/kubernetes v1.18.0 -k8s.io/kubernetes/pkg/util/goroutinemap -k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff k8s.io/kubernetes/pkg/util/slice # k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89 k8s.io/utils/buffer