From 8f69d745142dbc84dfddbf8db9f40ee5cb9701b9 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Fri, 29 Mar 2019 15:14:20 -0400 Subject: [PATCH] add leader election support for external snapshotter Signed-off-by: Andrew Sy Kim --- cmd/csi-snapshotter/main.go | 41 +++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index 7aea51ad..f551a213 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -34,6 +34,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/connection" + "github.com/kubernetes-csi/csi-lib-utils/leaderelection" csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc" "github.com/kubernetes-csi/external-snapshotter/pkg/controller" "github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter" @@ -65,10 +66,14 @@ var ( snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot") snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.") showVersion = flag.Bool("version", false, "Show version.") + + leaderElection = flag.Bool("leader-election", false, "Enables leader election.") + leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.") ) var ( - version = "unknown" + version = "unknown" + leaderElectionLockName = "external-snapshotter-leader-election" ) func main() { @@ -192,17 +197,31 @@ func main() { *snapshotNameUUIDLength, ) - // run... - stopCh := make(chan struct{}) - factory.Start(stopCh) - coreFactory.Start(stopCh) - go ctrl.Run(threads, stopCh) + run := func(context.Context) { + // run... + stopCh := make(chan struct{}) + factory.Start(stopCh) + coreFactory.Start(stopCh) + go ctrl.Run(threads, stopCh) - // ...until SIGINT - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - <-c - close(stopCh) + // ...until SIGINT + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + close(stopCh) + } + + if !*leaderElection { + run(context.TODO()) + } else { + le := leaderelection.NewLeaderElection(kubeClient, leaderElectionLockName, run) + if *leaderElectionNamespace != "" { + le.WithNamespace(*leaderElectionNamespace) + } + if err := le.Run(); err != nil { + klog.Fatalf("failed to initialize leader election: %v", err) + } + } } func buildConfig(kubeconfig string) (*rest.Config, error) {