diff --git a/README.md b/README.md index 8bacf3dd..343c0046 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,9 @@ Read more about how to install the example webhook [here](deploy/kubernetes/webh * `--worker-threads`: Number of worker threads for running create snapshot and delete snapshot operations. Default value is 10. +* `--retry-interval-start`: Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default value is 1 second. + +*`--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes. #### Other recognized arguments * `--kubeconfig `: Path to Kubernetes client configuration that the CSI external-snapshotter uses to connect to Kubernetes API server. When omitted, default token provided by Kubernetes will be used. This option is useful only when the external-snapshotter does not run as a Kubernetes pod, e.g. for debugging. diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index a3287549..7e499c36 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -72,8 +72,8 @@ var ( metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") - retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second") - retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes") + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.") ) var ( diff --git a/cmd/snapshot-controller/main.go b/cmd/snapshot-controller/main.go index 29afc328..4f516117 100644 --- a/cmd/snapshot-controller/main.go +++ b/cmd/snapshot-controller/main.go @@ -20,6 +20,7 @@ import ( "context" "flag" "fmt" + "k8s.io/client-go/util/workqueue" "os" "os/signal" "sync" @@ -55,8 +56,10 @@ var ( leaderElection = flag.Bool("leader-election", false, "Enables leader election.") leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics, will listen (example: :8080). The default is empty string, which means the server is disabled.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") + httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics, will listen (example: :8080). The default is empty string, which means the server is disabled.") + metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.") ) var ( @@ -170,6 +173,8 @@ func main() { coreFactory.Core().V1().PersistentVolumeClaims(), metricsManager, *resyncPeriod, + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), + workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), ) if err := ensureCustomResourceDefinitionsExist(snapClient); err != nil { diff --git a/pkg/common-controller/framework_test.go b/pkg/common-controller/framework_test.go index 81b340c3..414cfcc4 100644 --- a/pkg/common-controller/framework_test.go +++ b/pkg/common-controller/framework_test.go @@ -19,6 +19,7 @@ package common_controller import ( "errors" "fmt" + "k8s.io/client-go/util/workqueue" "reflect" sysruntime "runtime" "strconv" @@ -749,6 +750,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte coreFactory.Core().V1().PersistentVolumeClaims(), metricsManager, 60*time.Second, + workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), + workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute), ) ctrl.eventRecorder = record.NewFakeRecorder(1000) diff --git a/pkg/common-controller/snapshot_controller_base.go b/pkg/common-controller/snapshot_controller_base.go index 556c30e9..0d5b46d0 100644 --- a/pkg/common-controller/snapshot_controller_base.go +++ b/pkg/common-controller/snapshot_controller_base.go @@ -76,6 +76,8 @@ func NewCSISnapshotCommonController( pvcInformer coreinformers.PersistentVolumeClaimInformer, metricsManager metrics.MetricsManager, resyncPeriod time.Duration, + snapshotRateLimiter workqueue.RateLimiter, + contentRateLimiter workqueue.RateLimiter, ) *csiSnapshotCommonController { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -90,8 +92,8 @@ func NewCSISnapshotCommonController( resyncPeriod: resyncPeriod, snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), - snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-snapshot"), - contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-content"), + snapshotQueue: workqueue.NewNamedRateLimitingQueue(snapshotRateLimiter, "snapshot-controller-snapshot"), + contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "snapshot-controller-content"), metricsManager: metricsManager, }