diff --git a/cmd/snapshot-controller/main.go b/cmd/snapshot-controller/main.go index ff78b9ee..10c85eed 100644 --- a/cmd/snapshot-controller/main.go +++ b/cmd/snapshot-controller/main.go @@ -20,6 +20,8 @@ import ( "context" "flag" "fmt" + "net" + "net/http" "os" "os/signal" "sync" @@ -143,23 +145,15 @@ func main() { // Create and register metrics manager metricsManager := metrics.NewMetricsManager() wg := &sync.WaitGroup{} - wg.Add(1) + + mux := http.NewServeMux() if *httpEndpoint != "" { - srv, err := metricsManager.StartMetricsEndpoint(*metricsPath, *httpEndpoint, promklog{}, wg) + err := metricsManager.PrepareMetricsPath(mux, *metricsPath, promklog{}) if err != nil { - klog.Errorf("Failed to start metrics server: %s", err.Error()) + klog.Errorf("Failed to prepare metrics path: %s", err.Error()) os.Exit(1) } - defer func() { - err := srv.Shutdown(context.Background()) - if err != nil { - klog.Errorf("Failed to shutdown metrics server: %s", err.Error()) - } - - klog.Infof("Metrics server successfully shutdown") - wg.Done() - }() - klog.Infof("Metrics server successfully started on %s, %s", *httpEndpoint, *metricsPath) + klog.Infof("Metrics path successfully registered at %s", *metricsPath) } // Add Snapshot types to the default Kubernetes so events can be logged for them @@ -199,6 +193,32 @@ func main() { close(stopCh) } + // start listening & serving http endpoint if set + if *httpEndpoint != "" { + l, err := net.Listen("tcp", *httpEndpoint) + if err != nil { + klog.Fatalf("failed to listen on address[%s], error[%v]", *httpEndpoint, err) + } + srv := &http.Server{Addr: l.Addr().String(), Handler: mux} + go func() { + defer wg.Done() + if err := srv.Serve(l); err != http.ErrServerClosed { + klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", *httpEndpoint, *metricsPath, err) + } + }() + klog.Infof("Metrics http server successfully started on %s, %s", *httpEndpoint, *metricsPath) + + defer func() { + err := srv.Shutdown(context.Background()) + if err != nil { + klog.Errorf("Failed to shutdown metrics server: %s", err.Error()) + } + + klog.Infof("Metrics server successfully shutdown") + wg.Done() + }() + } + if !*leaderElection { run(context.TODO()) } else { @@ -210,6 +230,10 @@ func main() { klog.Fatalf("failed to create leaderelection client: %v", err) } le := leaderelection.NewLeaderElection(leClientset, lockName, run) + if *httpEndpoint != "" { + le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) + } + if *leaderElectionNamespace != "" { le.WithNamespace(*leaderElectionNamespace) } diff --git a/pkg/common-controller/framework_test.go b/pkg/common-controller/framework_test.go index 414cfcc4..c0093f00 100644 --- a/pkg/common-controller/framework_test.go +++ b/pkg/common-controller/framework_test.go @@ -19,7 +19,7 @@ package common_controller import ( "errors" "fmt" - "k8s.io/client-go/util/workqueue" + "net/http" "reflect" sysruntime "runtime" "strconv" @@ -29,6 +29,8 @@ import ( "testing" "time" + "k8s.io/client-go/util/workqueue" + crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" clientset "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" @@ -737,9 +739,14 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, utils.NoResyncPeriodFunc()) metricsManager := metrics.NewMetricsManager() - wg := &sync.WaitGroup{} - wg.Add(1) - metricsManager.StartMetricsEndpoint("/metrics", "localhost:0", nil, wg) + mux := http.NewServeMux() + metricsManager.PrepareMetricsPath(mux, "/metrics", nil) + go func() { + err := http.ListenAndServe("localhost:0", mux) + if err != nil { + t.Errorf("failed to prepare metrics path: %v", err) + } + }() ctrl := NewCSISnapshotCommonController( clientset, diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 92bb0736..c7f5f895 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -17,8 +17,6 @@ limitations under the License. package metrics import ( - "fmt" - "net" "net/http" "sync" "time" @@ -26,7 +24,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/apimachinery/pkg/types" k8smetrics "k8s.io/component-base/metrics" - klog "k8s.io/klog/v2" ) const ( @@ -89,12 +86,11 @@ type OperationStatus interface { var metricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600} type MetricsManager interface { - // StartMetricsEndpoint starts the metrics endpoint at the specified addr/pattern for - // metrics managed by this MetricsManager. It spawns a goroutine to listen to - // and serve HTTP requests received on addr/pattern. - // If the "pattern" is empty (i.e., ""), no endpoint will be started. + // PrepareMetricsPath prepares the metrics path the specified pattern for + // metrics managed by this MetricsManager. + // If the "pattern" is empty (i.e., ""), it will not be registered. // An error will be returned if there is any. - StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) + PrepareMetricsPath(mux *http.ServeMux, pattern string, logger promhttp.Logger) error // OperationStart takes in an operation and caches its start time. // if the operation already exists, it's an no-op. @@ -304,31 +300,15 @@ func (opMgr *operationMetricsManager) scheduleOpsInFlightMetric() { } } -func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) { - if addr == "" { - return nil, fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified") - } - // start listening - l, err := net.Listen("tcp", addr) - if err != nil { - return nil, fmt.Errorf("failed to listen on address[%s], error[%v]", addr, err) - } - mux := http.NewServeMux() +func (opMgr *operationMetricsManager) PrepareMetricsPath(mux *http.ServeMux, pattern string, logger promhttp.Logger) error { mux.Handle(pattern, k8smetrics.HandlerFor( opMgr.registry, k8smetrics.HandlerOpts{ ErrorLog: logger, ErrorHandling: k8smetrics.ContinueOnError, })) - srv := &http.Server{Addr: l.Addr().String(), Handler: mux} - // start serving the endpoint - go func() { - defer wg.Done() - if err := srv.Serve(l); err != http.ErrServerClosed { - klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err) - } - }() - return srv, nil + + return nil } func (opMgr *operationMetricsManager) GetRegistry() k8smetrics.KubeRegistry { diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index c16fe971..bb840a03 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -22,6 +22,7 @@ import ( "io" "io/ioutil" "log" + "net" "net/http" "reflect" "sort" @@ -60,35 +61,44 @@ func (s *fakeOpStatus) String() string { return "Unknown" } -func initMgr() (MetricsManager, *sync.WaitGroup, *http.Server) { - wg := &sync.WaitGroup{} - wg.Add(1) +func initMgr() (MetricsManager, *http.Server) { mgr := NewMetricsManager() - srv, err := mgr.StartMetricsEndpoint(httpPattern, addr, nil, wg) + mux := http.NewServeMux() + err := mgr.PrepareMetricsPath(mux, httpPattern, nil) if err != nil { log.Fatalf("failed to start serving [%v]", err) } - return mgr, wg, srv + l, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("failed to listen on address[%s], error[%v]", addr, err) + } + srv := &http.Server{Addr: l.Addr().String(), Handler: mux} + go func() { + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, httpPattern, err) + } + }() + + return mgr, srv } -func shutdown(srv *http.Server, wg *sync.WaitGroup) { +func shutdown(srv *http.Server) { if err := srv.Shutdown(context.Background()); err != nil { panic(err) } - wg.Wait() } func TestNew(t *testing.T) { - mgr, wg, srv := initMgr() - defer shutdown(srv, wg) + mgr, srv := initMgr() + defer shutdown(srv) if mgr == nil { t.Errorf("failed testing new") } } func TestDropNonExistingOperation(t *testing.T) { - mgr, wg, srv := initMgr() - defer shutdown(srv, wg) + mgr, srv := initMgr() + defer shutdown(srv) op := OperationKey{ Name: "drop-non-existing-operation-should-be-noop", ResourceID: types.UID("uid"), @@ -97,9 +107,9 @@ func TestDropNonExistingOperation(t *testing.T) { } func TestRecordMetricsForNonExistingOperation(t *testing.T) { - mgr, wg, srv := initMgr() + mgr, srv := initMgr() srvAddr := "http://" + srv.Addr + httpPattern - defer shutdown(srv, wg) + defer shutdown(srv) opKey := OperationKey{ Name: "no-metrics-should-be-recorded-as-operation-did-not-start", ResourceID: types.UID("uid"), @@ -119,9 +129,9 @@ func TestRecordMetricsForNonExistingOperation(t *testing.T) { } func TestDropOperation(t *testing.T) { - mgr, wg, srv := initMgr() + mgr, srv := initMgr() srvAddr := "http://" + srv.Addr + httpPattern - defer shutdown(srv, wg) + defer shutdown(srv) opKey := OperationKey{ Name: "should-have-been-dropped", ResourceID: types.UID("uid"), @@ -176,9 +186,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver",operation } func TestUnknownStatus(t *testing.T) { - mgr, wg, srv := initMgr() + mgr, srv := initMgr() srvAddr := "http://" + srv.Addr + httpPattern - defer shutdown(srv, wg) + defer shutdown(srv) opKey := OperationKey{ Name: "unknown-status-operation", ResourceID: types.UID("uid"), @@ -214,9 +224,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver",operation } func TestRecordMetrics(t *testing.T) { - mgr, wg, srv := initMgr() + mgr, srv := initMgr() srvAddr := "http://" + srv.Addr + httpPattern - defer shutdown(srv, wg) + defer shutdown(srv) // add an operation opKey := OperationKey{ Name: "op1", @@ -284,9 +294,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver2",operatio } func TestConcurrency(t *testing.T) { - mgr, wg, srv := initMgr() + mgr, srv := initMgr() srvAddr := "http://" + srv.Addr + httpPattern - defer shutdown(srv, wg) + defer shutdown(srv) success := &fakeOpStatus{ statusCode: 0, } @@ -482,8 +492,8 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver5",operatio func TestInFlightMetric(t *testing.T) { inFlightCheckInterval = time.Millisecond * 50 - mgr, wg, srv := initMgr() - defer shutdown(srv, wg) + mgr, srv := initMgr() + defer shutdown(srv) srvAddr := "http://" + srv.Addr + httpPattern // Start first operation, should be 1 @@ -710,8 +720,8 @@ func containsMetrics(expectedMfs, gotMfs []*cmg.MetricFamily) bool { } func TestProcessStartTimeMetricExist(t *testing.T) { - mgr, wg, srv := initMgr() - defer shutdown(srv, wg) + mgr, srv := initMgr() + defer shutdown(srv) metricsFamilies, err := mgr.GetRegistry().Gather() if err != nil { t.Fatalf("Error fetching metrics: %v", err) diff --git a/pkg/sidecar-controller/snapshot_controller_base.go b/pkg/sidecar-controller/snapshot_controller_base.go index 0d203065..07876a18 100644 --- a/pkg/sidecar-controller/snapshot_controller_base.go +++ b/pkg/sidecar-controller/snapshot_controller_base.go @@ -26,7 +26,7 @@ import ( storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1" "github.com/kubernetes-csi/external-snapshotter/v4/pkg/snapshotter" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait"