Fix requeue logic in the common controller

This commit is contained in:
xing-yang
2020-05-22 02:40:24 +00:00
parent 4800ca72d4
commit 5800df6a61
10 changed files with 166 additions and 662 deletions

View File

@@ -582,7 +582,6 @@ func (r *snapshotReactor) getChangeCount() int {
// waitForIdle waits until all tests, controllers and other goroutines do their
// job and no new actions are registered for 10 milliseconds.
func (r *snapshotReactor) waitForIdle() {
r.ctrl.runningOperations.WaitForCompletion()
// Check every 10ms if the controller does something and stop if it's
// idle.
oldChanges := -1
@@ -609,9 +608,6 @@ func (r *snapshotReactor) waitTest(test controllerTest) error {
Steps: 10,
}
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// Finish all operations that are in progress
r.ctrl.runningOperations.WaitForCompletion()
// Return 'true' if the reactor reached the expected state
err1 := r.checkSnapshots(test.expectedSnapshots)
err2 := r.checkContents(test.expectedContents)
@@ -757,8 +753,6 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
ctrl.snapshotListerSynced = alwaysReady
ctrl.classListerSynced = alwaysReady
ctrl.pvcListerSynced = alwaysReady
ctrl.createSnapshotContentInterval = time.Millisecond * 5
ctrl.createSnapshotContentRetryCount = 3
return ctrl, nil
}

View File

@@ -423,18 +423,10 @@ func (ctrl *csiSnapshotCommonController) syncUnreadySnapshot(snapshot *crdv1.Vol
}
// update snapshot status
for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ {
klog.V(5).Infof("syncUnreadySnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshot))
_, err = ctrl.updateSnapshotStatus(snapshot, newContent)
if err == nil {
break
}
klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err)
time.Sleep(ctrl.createSnapshotContentInterval)
}
if err != nil {
if _, err = ctrl.updateSnapshotStatus(snapshot, newContent); err != nil {
// update snapshot status failed
klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err)
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotStatusUpdateFailed", fmt.Sprintf("Snapshot status update failed, %v", err))
return err
}
@@ -474,17 +466,8 @@ func (ctrl *csiSnapshotCommonController) syncUnreadySnapshot(snapshot *crdv1.Vol
}
// Update snapshot status with BoundVolumeSnapshotContentName
for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ {
klog.V(5).Infof("syncUnreadySnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshot))
_, err = ctrl.updateSnapshotStatus(snapshot, content)
if err == nil {
break
}
klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err)
time.Sleep(ctrl.createSnapshotContentInterval)
}
if err != nil {
if _, err = ctrl.updateSnapshotStatus(snapshot, content); err != nil {
// update snapshot status failed
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotStatusUpdateFailed", fmt.Sprintf("Snapshot status update failed, %v", err))
return err
@@ -656,9 +639,8 @@ func (ctrl *csiSnapshotCommonController) createSnapshotContent(snapshot *crdv1.V
}
var updateContent *crdv1.VolumeSnapshotContent
klog.V(3).Infof("volume snapshot content %#v", snapshotContent)
// Try to create the VolumeSnapshotContent object several times
for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ {
klog.V(5).Infof("volume snapshot content %#v", snapshotContent)
// Try to create the VolumeSnapshotContent object
klog.V(5).Infof("createSnapshotContent [%s]: trying to save volume snapshot content %s", utils.SnapshotKey(snapshot), snapshotContent.Name)
if updateContent, err = ctrl.clientset.SnapshotV1beta1().VolumeSnapshotContents().Create(context.TODO(), snapshotContent, metav1.CreateOptions{}); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
@@ -669,11 +651,6 @@ func (ctrl *csiSnapshotCommonController) createSnapshotContent(snapshot *crdv1.V
} else {
klog.V(3).Infof("volume snapshot content %q for snapshot %q saved, %v", snapshotContent.Name, utils.SnapshotKey(snapshot), snapshotContent)
}
break
}
// Save failed, try again after a while.
klog.V(3).Infof("failed to save volume snapshot content %q for snapshot %q: %v", snapshotContent.Name, utils.SnapshotKey(snapshot), err)
time.Sleep(ctrl.createSnapshotContentInterval)
}
if err != nil {
@@ -982,19 +959,14 @@ func (ctrl *csiSnapshotCommonController) bindandUpdateVolumeSnapshot(snapshotCon
snapshotCopy := snapshotObj.DeepCopy()
// update snapshot status
var updateSnapshot *crdv1.VolumeSnapshot
for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ {
klog.V(5).Infof("bindandUpdateVolumeSnapshot [%s]: trying to update snapshot status", utils.SnapshotKey(snapshotCopy))
updateSnapshot, err = ctrl.updateSnapshotStatus(snapshotCopy, snapshotContent)
if err == nil {
snapshotCopy = updateSnapshot
break
}
klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err)
time.Sleep(ctrl.createSnapshotContentInterval)
}
if err != nil {
// update snapshot status failed
klog.V(4).Infof("failed to update snapshot %s status: %v", utils.SnapshotKey(snapshot), err)
ctrl.updateSnapshotErrorStatusWithEvent(snapshotCopy, v1.EventTypeWarning, "SnapshotStatusUpdateFailed", fmt.Sprintf("Snapshot status update failed, %v", err))
return nil, err
}

View File

@@ -39,15 +39,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/goroutinemap"
)
// Number of retries when we create a VolumeSnapshotContent object
const createSnapshotContentRetryCount = 5
// Interval between retries when we create a VolumeSnapshotContent object
const createSnapshotContentInterval = 10 * time.Second
type csiSnapshotCommonController struct {
clientset clientset.Interface
client kubernetes.Interface
@@ -67,11 +60,6 @@ type csiSnapshotCommonController struct {
snapshotStore cache.Store
contentStore cache.Store
// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap
createSnapshotContentRetryCount int
createSnapshotContentInterval time.Duration
resyncPeriod time.Duration
}
@@ -95,9 +83,6 @@ func NewCSISnapshotCommonController(
clientset: clientset,
client: client,
eventRecorder: eventRecorder,
runningOperations: goroutinemap.NewGoRoutineMap(true),
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
createSnapshotContentInterval: createSnapshotContentInterval,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
@@ -192,23 +177,35 @@ func (ctrl *csiSnapshotCommonController) enqueueContentWork(obj interface{}) {
}
}
// snapshotWorker processes items from snapshotQueue. It must run only once,
// syncSnapshot is not assured to be reentrant.
// snapshotWorker is the main worker for VolumeSnapshots.
func (ctrl *csiSnapshotCommonController) snapshotWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.snapshotQueue.Get()
if quit {
return true
return
}
defer ctrl.snapshotQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("snapshotWorker[%s]", key)
if err := ctrl.syncSnapshotByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.snapshotQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync snapshot %q, will retry again: %v", keyObj.(string), err)
} else {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.snapshotQueue.Forget(keyObj)
}
}
// syncSnapshotByKey processes a VolumeSnapshot request.
func (ctrl *csiSnapshotCommonController) syncSnapshotByKey(key string) error {
klog.V(5).Infof("syncSnapshotByKey[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
klog.V(5).Infof("snapshotWorker: snapshot namespace [%s] name [%s]", namespace, name)
if err != nil {
klog.Errorf("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err)
return false
return nil
}
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err == nil {
@@ -219,77 +216,81 @@ func (ctrl *csiSnapshotCommonController) snapshotWorker() {
// If the VolumeSnapshotClass is not found, we still need to process an update
// so that syncSnapshot can delete the snapshot, should it still exist in the
// cluster after it's been removed from the informer cache
klog.V(5).Infof("updating snapshot %q; snapshotClass may have already been removed", key)
ctrl.updateSnapshot(newSnapshot)
if newSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err) {
klog.V(5).Infof("Snapshot %q is being deleted. SnapshotClass has already been removed", key)
}
return false
klog.V(5).Infof("Updating snapshot %q", key)
return ctrl.updateSnapshot(newSnapshot)
}
return err
}
if err != nil && !errors.IsNotFound(err) {
klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
return false
return err
}
// The snapshot is not in informer cache, the event must have been "delete"
vsObj, found, err := ctrl.snapshotStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting snapshot %q from cache: %v", key, err)
return false
return nil
}
if !found {
// The controller has already processed the delete event and
// deleted the snapshot from its cache
klog.V(2).Infof("deletion of snapshot %q was already processed", key)
return false
return nil
}
snapshot, ok := vsObj.(*crdv1.VolumeSnapshot)
if !ok {
klog.Errorf("expected vs, got %+v", vsObj)
return false
}
newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot)
if err == nil || errors.IsNotFound(err) {
// We should still handle deletion events even if the VolumeSnapshotClass
// is not found in the cluster
klog.V(5).Infof("deleting snapshot %q; snapshotClass may have already been removed", key)
ctrl.deleteSnapshot(newSnapshot)
}
return false
return nil
}
for {
if quit := workFunc(); quit {
klog.Infof("snapshot worker queue shutting down")
return
}
}
klog.V(5).Infof("deleting snapshot %q", key)
ctrl.deleteSnapshot(snapshot)
return nil
}
// contentWorker processes items from contentQueue. It must run only once,
// syncContent is not assured to be reentrant.
// contentWorker is the main worker for VolumeSnapshotContent.
func (ctrl *csiSnapshotCommonController) contentWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.contentQueue.Get()
if quit {
return true
return
}
defer ctrl.contentQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("contentWorker[%s]", key)
if err := ctrl.syncContentByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.contentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
} else {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.contentQueue.Forget(keyObj)
}
}
// syncContentByKey processes a VolumeSnapshotContent request.
func (ctrl *csiSnapshotCommonController) syncContentByKey(key string) error {
klog.V(5).Infof("syncContentByKey[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
return false
return nil
}
content, err := ctrl.contentLister.Get(name)
// The content still exists in informer cache, the event must have
// been add/update/sync
if err == nil {
ctrl.updateContent(content)
return false
// If error occurs we add this item back to the queue
return ctrl.updateContent(content)
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting content %q from informer: %v", key, err)
return false
return nil
}
// The content is not in informer cache, the event must have been
@@ -297,29 +298,21 @@ func (ctrl *csiSnapshotCommonController) contentWorker() {
contentObj, found, err := ctrl.contentStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting content %q from cache: %v", key, err)
return false
return nil
}
if !found {
// The controller has already processed the delete event and
// deleted the content from its cache
klog.V(2).Infof("deletion of content %q was already processed", key)
return false
return nil
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
klog.Errorf("expected content, got %+v", content)
return false
return nil
}
ctrl.deleteContent(content)
return false
}
for {
if quit := workFunc(); quit {
klog.Infof("content worker queue shutting down")
return
}
}
return nil
}
// checkAndUpdateSnapshotClass gets the VolumeSnapshotClass from VolumeSnapshot. If it is not set,
@@ -357,7 +350,7 @@ func (ctrl *csiSnapshotCommonController) checkAndUpdateSnapshotClass(snapshot *c
// updateSnapshot runs in worker thread and handles "snapshot added",
// "snapshot updated" and "periodic sync" events.
func (ctrl *csiSnapshotCommonController) updateSnapshot(snapshot *crdv1.VolumeSnapshot) {
func (ctrl *csiSnapshotCommonController) updateSnapshot(snapshot *crdv1.VolumeSnapshot) error {
// Store the new snapshot version in the cache and do not process it if this is
// an old version.
klog.V(5).Infof("updateSnapshot %q", utils.SnapshotKey(snapshot))
@@ -366,23 +359,25 @@ func (ctrl *csiSnapshotCommonController) updateSnapshot(snapshot *crdv1.VolumeSn
klog.Errorf("%v", err)
}
if !newSnapshot {
return
return nil
}
err = ctrl.syncSnapshot(snapshot)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync claim %q: %+v", utils.SnapshotKey(snapshot), err)
klog.V(3).Infof("could not sync snapshot %q: %+v", utils.SnapshotKey(snapshot), err)
} else {
klog.Errorf("could not sync volume %q: %+v", utils.SnapshotKey(snapshot), err)
klog.Errorf("could not sync snapshot %q: %+v", utils.SnapshotKey(snapshot), err)
}
return err
}
return nil
}
// updateContent runs in worker thread and handles "content added",
// "content updated" and "periodic sync" events.
func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnapshotContent) {
func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnapshotContent) error {
// Store the new content version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeContentUpdate(content)
@@ -390,7 +385,7 @@ func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnap
klog.Errorf("%v", err)
}
if !new {
return
return nil
}
err = ctrl.syncContent(content)
if err != nil {
@@ -401,7 +396,9 @@ func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnap
} else {
klog.Errorf("could not sync content %q: %+v", content.Name, err)
}
return err
}
return nil
}
// deleteSnapshot runs in worker thread and handles "snapshot deleted" event.

View File

@@ -107,24 +107,6 @@ func TestCreateSnapshotSync(t *testing.T) {
expectSuccess: false,
test: testSyncSnapshot,
},
{
name: "7-2 - fail to update snapshot reports warning event",
initialContents: newContentArrayWithReadyToUse("snapcontent-snapuid7-2", "snapuid7-2", "snap7-2", "sid7-2", classGold, "", "pv-handle7-2", deletionPolicy, nil, nil, &True, false),
expectedContents: newContentArrayWithReadyToUse("snapcontent-snapuid7-2", "snapuid7-2", "snap7-2", "sid7-2", classGold, "", "pv-handle7-2", deletionPolicy, nil, nil, &True, false),
initialSnapshots: newSnapshotArray("snap7-2", "snapuid7-2", "claim7-2", "", classGold, "snapcontent-snapuid7-2", &False, nil, nil, nil, false, true, nil),
expectedSnapshots: newSnapshotArray("snap7-2", "snapuid7-2", "claim7-2", "", classGold, "snapcontent-snapuid7-2", &False, nil, nil, newVolumeError("Snapshot status update failed, snapshot controller failed to update default/snap7-2 on API server: mock update error"), false, true, nil),
initialClaims: newClaimArray("claim7-2", "pvc-uid7-2", "1Gi", "volume7-2", v1.ClaimBound, &classGold),
initialVolumes: newVolumeArray("volume7-2", "pv-uid7-2", "pv-handle7-2", "1Gi", "pvc-uid7-2", "claim7-2", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classGold),
expectedEvents: []string{"Warning SnapshotStatusUpdateFailed"},
errors: []reactorError{
// Inject error to the forth client.VolumesnapshotV1beta1().VolumeSnapshots().Update call.
// All other calls will succeed.
{"update", "volumesnapshots", errors.New("mock update error")},
{"update", "volumesnapshots", errors.New("mock update error")},
{"update", "volumesnapshots", errors.New("mock update error")},
}, test: testSyncSnapshot,
},
{
name: "7-3 - fail to create snapshot without snapshot class ",
initialContents: nocontents,
@@ -193,23 +175,6 @@ func TestCreateSnapshotSync(t *testing.T) {
expectSuccess: false,
test: testSyncSnapshot,
},
{
name: "7-8 - fail create snapshot due to cannot update snapshot status",
initialContents: nocontents,
expectedContents: newContentArrayNoStatus("snapcontent-snapuid7-8", "snapuid7-8", "snap7-8", "sid7-8", classGold, "", "pv-handle7-8", deletionPolicy, nil, nil, false, false),
initialSnapshots: newSnapshotArray("snap7-8", "snapuid7-8", "claim7-8", "", classGold, "", &False, nil, nil, nil, false, true, nil),
expectedSnapshots: newSnapshotArray("snap7-8", "snapuid7-8", "claim7-8", "", classGold, "", &False, nil, nil, newVolumeError("Snapshot status update failed, snapshot controller failed to update default/snap7-8 on API server: mock update error"), false, true, nil),
initialClaims: newClaimArray("claim7-8", "pvc-uid7-8", "1Gi", "volume7-8", v1.ClaimBound, &classEmpty),
initialVolumes: newVolumeArray("volume7-8", "pv-uid7-8", "pv-handle7-8", "1Gi", "pvc-uid7-8", "claim7-8", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty),
errors: []reactorError{
{"update", "volumesnapshots", errors.New("mock update error")},
{"update", "volumesnapshots", errors.New("mock update error")},
{"update", "volumesnapshots", errors.New("mock update error")},
},
expectedEvents: []string{"Normal CreatingSnapshot"},
expectSuccess: false,
test: testSyncSnapshot,
},
{
name: "7-9 - fail create snapshot due to cannot update snapshot status, and failure cannot be recorded either due to additional status update failure.",
initialContents: nocontents,
@@ -239,7 +204,8 @@ func TestCreateSnapshotSync(t *testing.T) {
test: testSyncSnapshot,
},
{
// TODO(xiangqian): this test case needs to be revisited the scenario
// TODO(xiangqian): this test case needs to be
// revisited the scenario
// of VolumeSnapshotContent saving failure. Since there will be no content object
// in API server, it could potentially cause leaking issue
name: "7-11 - fail create snapshot due to cannot save snapshot content",

View File

@@ -1,41 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["goroutinemap.go"],
importpath = "k8s.io/kubernetes/pkg/util/goroutinemap",
deps = [
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["goroutinemap_test.go"],
embed = [":go_default_library"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/util/goroutinemap/exponentialbackoff:all-srcs",
],
tags = ["automanaged"],
)

View File

@@ -1,6 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- saad-ali
reviewers:
- saad-ali

View File

@@ -1,25 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["exponential_backoff.go"],
importpath = "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff",
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -1,121 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package exponentialbackoff contains logic for implementing exponential
// backoff for GoRoutineMap and NestedPendingOperations.
package exponentialbackoff
import (
"fmt"
"time"
)
const (
// initialDurationBeforeRetry is the amount of time after an error occurs
// that GoroutineMap will refuse to allow another operation to start with
// the same target (if exponentialBackOffOnError is enabled). Each
// successive error results in a wait 2x times the previous.
initialDurationBeforeRetry time.Duration = 500 * time.Millisecond
// maxDurationBeforeRetry is the maximum amount of time that
// durationBeforeRetry will grow to due to exponential backoff.
// Value is slightly offset from 2 minutes to make timeouts due to this
// constant recognizable.
maxDurationBeforeRetry time.Duration = 2*time.Minute + 2*time.Second
)
// ExponentialBackoff contains the last occurrence of an error and the duration
// that retries are not permitted.
type ExponentialBackoff struct {
lastError error
lastErrorTime time.Time
durationBeforeRetry time.Duration
}
// SafeToRetry returns an error if the durationBeforeRetry period for the given
// lastErrorTime has not yet expired. Otherwise it returns nil.
func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error {
if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry {
return NewExponentialBackoffError(operationName, *expBackoff)
}
return nil
}
func (expBackoff *ExponentialBackoff) Update(err *error) {
if expBackoff.durationBeforeRetry == 0 {
expBackoff.durationBeforeRetry = initialDurationBeforeRetry
} else {
expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry
if expBackoff.durationBeforeRetry > maxDurationBeforeRetry {
expBackoff.durationBeforeRetry = maxDurationBeforeRetry
}
}
expBackoff.lastError = *err
expBackoff.lastErrorTime = time.Now()
}
func (expBackoff *ExponentialBackoff) GenerateNoRetriesPermittedMsg(operationName string) string {
return fmt.Sprintf("Operation for %q failed. No retries permitted until %v (durationBeforeRetry %v). Error: %q",
operationName,
expBackoff.lastErrorTime.Add(expBackoff.durationBeforeRetry),
expBackoff.durationBeforeRetry,
expBackoff.lastError)
}
// NewExponentialBackoffError returns a new instance of ExponentialBackoff error.
func NewExponentialBackoffError(
operationName string, expBackoff ExponentialBackoff) error {
return exponentialBackoffError{
operationName: operationName,
expBackoff: expBackoff,
}
}
// IsExponentialBackoff returns true if an error returned from GoroutineMap
// indicates that a new operation can not be started because
// exponentialBackOffOnError is enabled and a previous operation with the same
// operation failed within the durationBeforeRetry period.
func IsExponentialBackoff(err error) bool {
switch err.(type) {
case exponentialBackoffError:
return true
default:
return false
}
}
// exponentialBackoffError is the error returned returned from GoroutineMap when
// a new operation can not be started because exponentialBackOffOnError is
// enabled and a previous operation with the same operation failed within the
// durationBeforeRetry period.
type exponentialBackoffError struct {
operationName string
expBackoff ExponentialBackoff
}
var _ error = exponentialBackoffError{}
func (err exponentialBackoffError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name failed at %v. No retries permitted until %v (%v). Last error: %q.",
err.operationName,
err.expBackoff.lastErrorTime,
err.expBackoff.lastErrorTime.Add(err.expBackoff.durationBeforeRetry),
err.expBackoff.durationBeforeRetry,
err.expBackoff.lastError)
}

View File

@@ -1,230 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package goroutinemap implements a data structure for managing go routines
by name. It prevents the creation of new go routines if an existing go routine
with the same name exists.
*/
package goroutinemap
import (
"fmt"
"sync"
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
)
// GoRoutineMap defines a type that can run named goroutines and track their
// state. It prevents the creation of multiple goroutines with the same name
// and may prevent recreation of a goroutine until after the a backoff time
// has elapsed after the last goroutine with that name finished.
type GoRoutineMap interface {
// Run adds operation name to the list of running operations and spawns a
// new go routine to execute the operation.
// If an operation with the same operation name already exists, an
// AlreadyExists or ExponentialBackoff error is returned.
// Once the operation is complete, the go routine is terminated and the
// operation name is removed from the list of executing operations allowing
// a new operation to be started with the same operation name without error.
Run(operationName string, operationFunc func() error) error
// Wait blocks until operations map is empty. This is typically
// necessary during tests - the test should wait until all operations finish
// and evaluate results after that.
Wait()
// WaitForCompletion blocks until either all operations have successfully completed
// or have failed but are not pending. The test should wait until operations are either
// complete or have failed.
WaitForCompletion()
// IsOperationPending returns true if the operation is pending (currently
// running), otherwise returns false.
IsOperationPending(operationName string) bool
}
// NewGoRoutineMap returns a new instance of GoRoutineMap.
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
g := &goRoutineMap{
operations: make(map[string]operation),
exponentialBackOffOnError: exponentialBackOffOnError,
}
g.cond = sync.NewCond(&g.lock)
return g
}
type goRoutineMap struct {
operations map[string]operation
exponentialBackOffOnError bool
cond *sync.Cond
lock sync.RWMutex
}
// operation holds the state of a single goroutine.
type operation struct {
operationPending bool
expBackoff exponentialbackoff.ExponentialBackoff
}
func (grm *goRoutineMap) Run(
operationName string,
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
existingOp, exists := grm.operations[operationName]
if exists {
// Operation with name exists
if existingOp.operationPending {
return NewAlreadyExistsError(operationName)
}
if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
return err
}
}
grm.operations[operationName] = operation{
operationPending: true,
expBackoff: existingOp.expBackoff,
}
go func() (err error) {
// Handle unhandled panics (very unlikely)
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(operationName, &err)
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&err)
return operationFunc()
}()
return nil
}
// operationComplete handles the completion of a goroutine run in the
// goRoutineMap.
func (grm *goRoutineMap) operationComplete(
operationName string, err *error) {
// Defer operations are executed in Last-In is First-Out order. In this case
// the lock is acquired first when operationCompletes begins, and is
// released when the method finishes, after the lock is released cond is
// signaled to wake waiting goroutine.
defer grm.cond.Signal()
grm.lock.Lock()
defer grm.lock.Unlock()
if *err == nil || !grm.exponentialBackOffOnError {
// Operation completed without error, or exponentialBackOffOnError disabled
delete(grm.operations, operationName)
if *err != nil {
// Log error
klog.Errorf("operation for %q failed with: %v",
operationName,
*err)
}
} else {
// Operation completed with error and exponentialBackOffOnError Enabled
existingOp := grm.operations[operationName]
existingOp.expBackoff.Update(err)
existingOp.operationPending = false
grm.operations[operationName] = existingOp
// Log error
klog.Errorf("%v",
existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
}
}
func (grm *goRoutineMap) IsOperationPending(operationName string) bool {
grm.lock.RLock()
defer grm.lock.RUnlock()
existingOp, exists := grm.operations[operationName]
if exists && existingOp.operationPending {
return true
}
return false
}
func (grm *goRoutineMap) Wait() {
grm.lock.Lock()
defer grm.lock.Unlock()
for len(grm.operations) > 0 {
grm.cond.Wait()
}
}
func (grm *goRoutineMap) WaitForCompletion() {
grm.lock.Lock()
defer grm.lock.Unlock()
for {
if len(grm.operations) == 0 || grm.nothingPending() {
break
} else {
grm.cond.Wait()
}
}
}
// Check if any operation is pending. Already assumes caller has the
// necessary locks
func (grm *goRoutineMap) nothingPending() bool {
nothingIsPending := true
for _, operation := range grm.operations {
if operation.operationPending {
nothingIsPending = false
break
}
}
return nothingIsPending
}
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationName string) error {
return alreadyExistsError{operationName}
}
// IsAlreadyExists returns true if an error returned from GoRoutineMap indicates
// a new operation can not be started because an operation with the same
// operation name is already executing.
func IsAlreadyExists(err error) bool {
switch err.(type) {
case alreadyExistsError:
return true
default:
return false
}
}
// alreadyExistsError is the error returned by GoRoutineMap when a new operation
// can not be started because an operation with the same operation name is
// already executing.
type alreadyExistsError struct {
operationName string
}
var _ error = alreadyExistsError{}
func (err alreadyExistsError) Error() string {
return fmt.Sprintf(
"Failed to create operation with name %q. An operation with that name is already executing.",
err.operationName)
}

2
vendor/modules.txt vendored
View File

@@ -535,8 +535,6 @@ k8s.io/kube-openapi/pkg/generators/rules
k8s.io/kube-openapi/pkg/util/proto
k8s.io/kube-openapi/pkg/util/sets
# k8s.io/kubernetes v1.18.0
k8s.io/kubernetes/pkg/util/goroutinemap
k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff
k8s.io/kubernetes/pkg/util/slice
# k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
k8s.io/utils/buffer