Refactor http server and register leaderelection health check

- Needed to start the http server outside of pkg/metric
- We needed this because we want to add other endpoints to the server

Signed-off-by: Grant Griffiths <ggriffiths@purestorage.com>
This commit is contained in:
Grant Griffiths
2021-08-10 02:17:08 -07:00
parent b5b71904b4
commit 0476dcedcc
5 changed files with 91 additions and 70 deletions

View File

@@ -20,6 +20,8 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"net"
"net/http"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
@@ -143,23 +145,15 @@ func main() {
// Create and register metrics manager // Create and register metrics manager
metricsManager := metrics.NewMetricsManager() metricsManager := metrics.NewMetricsManager()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(1)
mux := http.NewServeMux()
if *httpEndpoint != "" { if *httpEndpoint != "" {
srv, err := metricsManager.StartMetricsEndpoint(*metricsPath, *httpEndpoint, promklog{}, wg) err := metricsManager.PrepareMetricsPath(mux, *metricsPath, promklog{})
if err != nil { if err != nil {
klog.Errorf("Failed to start metrics server: %s", err.Error()) klog.Errorf("Failed to prepare metrics path: %s", err.Error())
os.Exit(1) os.Exit(1)
} }
defer func() { klog.Infof("Metrics path successfully registered at %s", *metricsPath)
err := srv.Shutdown(context.Background())
if err != nil {
klog.Errorf("Failed to shutdown metrics server: %s", err.Error())
}
klog.Infof("Metrics server successfully shutdown")
wg.Done()
}()
klog.Infof("Metrics server successfully started on %s, %s", *httpEndpoint, *metricsPath)
} }
// Add Snapshot types to the default Kubernetes so events can be logged for them // Add Snapshot types to the default Kubernetes so events can be logged for them
@@ -199,6 +193,32 @@ func main() {
close(stopCh) close(stopCh)
} }
// start listening & serving http endpoint if set
if *httpEndpoint != "" {
l, err := net.Listen("tcp", *httpEndpoint)
if err != nil {
klog.Fatalf("failed to listen on address[%s], error[%v]", *httpEndpoint, err)
}
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
go func() {
defer wg.Done()
if err := srv.Serve(l); err != http.ErrServerClosed {
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", *httpEndpoint, *metricsPath, err)
}
}()
klog.Infof("Metrics http server successfully started on %s, %s", *httpEndpoint, *metricsPath)
defer func() {
err := srv.Shutdown(context.Background())
if err != nil {
klog.Errorf("Failed to shutdown metrics server: %s", err.Error())
}
klog.Infof("Metrics server successfully shutdown")
wg.Done()
}()
}
if !*leaderElection { if !*leaderElection {
run(context.TODO()) run(context.TODO())
} else { } else {
@@ -210,6 +230,10 @@ func main() {
klog.Fatalf("failed to create leaderelection client: %v", err) klog.Fatalf("failed to create leaderelection client: %v", err)
} }
le := leaderelection.NewLeaderElection(leClientset, lockName, run) le := leaderelection.NewLeaderElection(leClientset, lockName, run)
if *httpEndpoint != "" {
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
}
if *leaderElectionNamespace != "" { if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace) le.WithNamespace(*leaderElectionNamespace)
} }

View File

@@ -19,7 +19,7 @@ package common_controller
import ( import (
"errors" "errors"
"fmt" "fmt"
"k8s.io/client-go/util/workqueue" "net/http"
"reflect" "reflect"
sysruntime "runtime" sysruntime "runtime"
"strconv" "strconv"
@@ -29,6 +29,8 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/client-go/util/workqueue"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
clientset "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" clientset "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
"github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
@@ -737,9 +739,14 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, utils.NoResyncPeriodFunc()) coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, utils.NoResyncPeriodFunc())
metricsManager := metrics.NewMetricsManager() metricsManager := metrics.NewMetricsManager()
wg := &sync.WaitGroup{} mux := http.NewServeMux()
wg.Add(1) metricsManager.PrepareMetricsPath(mux, "/metrics", nil)
metricsManager.StartMetricsEndpoint("/metrics", "localhost:0", nil, wg) go func() {
err := http.ListenAndServe("localhost:0", mux)
if err != nil {
t.Errorf("failed to prepare metrics path: %v", err)
}
}()
ctrl := NewCSISnapshotCommonController( ctrl := NewCSISnapshotCommonController(
clientset, clientset,

View File

@@ -17,8 +17,6 @@ limitations under the License.
package metrics package metrics
import ( import (
"fmt"
"net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@@ -26,7 +24,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
k8smetrics "k8s.io/component-base/metrics" k8smetrics "k8s.io/component-base/metrics"
klog "k8s.io/klog/v2"
) )
const ( const (
@@ -89,12 +86,11 @@ type OperationStatus interface {
var metricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600} var metricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600}
type MetricsManager interface { type MetricsManager interface {
// StartMetricsEndpoint starts the metrics endpoint at the specified addr/pattern for // PrepareMetricsPath prepares the metrics path the specified pattern for
// metrics managed by this MetricsManager. It spawns a goroutine to listen to // metrics managed by this MetricsManager.
// and serve HTTP requests received on addr/pattern. // If the "pattern" is empty (i.e., ""), it will not be registered.
// If the "pattern" is empty (i.e., ""), no endpoint will be started.
// An error will be returned if there is any. // An error will be returned if there is any.
StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) PrepareMetricsPath(mux *http.ServeMux, pattern string, logger promhttp.Logger) error
// OperationStart takes in an operation and caches its start time. // OperationStart takes in an operation and caches its start time.
// if the operation already exists, it's an no-op. // if the operation already exists, it's an no-op.
@@ -304,31 +300,15 @@ func (opMgr *operationMetricsManager) scheduleOpsInFlightMetric() {
} }
} }
func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) { func (opMgr *operationMetricsManager) PrepareMetricsPath(mux *http.ServeMux, pattern string, logger promhttp.Logger) error {
if addr == "" {
return nil, fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
}
// start listening
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on address[%s], error[%v]", addr, err)
}
mux := http.NewServeMux()
mux.Handle(pattern, k8smetrics.HandlerFor( mux.Handle(pattern, k8smetrics.HandlerFor(
opMgr.registry, opMgr.registry,
k8smetrics.HandlerOpts{ k8smetrics.HandlerOpts{
ErrorLog: logger, ErrorLog: logger,
ErrorHandling: k8smetrics.ContinueOnError, ErrorHandling: k8smetrics.ContinueOnError,
})) }))
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
// start serving the endpoint return nil
go func() {
defer wg.Done()
if err := srv.Serve(l); err != http.ErrServerClosed {
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err)
}
}()
return srv, nil
} }
func (opMgr *operationMetricsManager) GetRegistry() k8smetrics.KubeRegistry { func (opMgr *operationMetricsManager) GetRegistry() k8smetrics.KubeRegistry {

View File

@@ -22,6 +22,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"net"
"net/http" "net/http"
"reflect" "reflect"
"sort" "sort"
@@ -60,35 +61,44 @@ func (s *fakeOpStatus) String() string {
return "Unknown" return "Unknown"
} }
func initMgr() (MetricsManager, *sync.WaitGroup, *http.Server) { func initMgr() (MetricsManager, *http.Server) {
wg := &sync.WaitGroup{}
wg.Add(1)
mgr := NewMetricsManager() mgr := NewMetricsManager()
srv, err := mgr.StartMetricsEndpoint(httpPattern, addr, nil, wg) mux := http.NewServeMux()
err := mgr.PrepareMetricsPath(mux, httpPattern, nil)
if err != nil { if err != nil {
log.Fatalf("failed to start serving [%v]", err) log.Fatalf("failed to start serving [%v]", err)
} }
return mgr, wg, srv l, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen on address[%s], error[%v]", addr, err)
}
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
go func() {
if err := srv.Serve(l); err != http.ErrServerClosed {
log.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, httpPattern, err)
}
}()
return mgr, srv
} }
func shutdown(srv *http.Server, wg *sync.WaitGroup) { func shutdown(srv *http.Server) {
if err := srv.Shutdown(context.Background()); err != nil { if err := srv.Shutdown(context.Background()); err != nil {
panic(err) panic(err)
} }
wg.Wait()
} }
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
defer shutdown(srv, wg) defer shutdown(srv)
if mgr == nil { if mgr == nil {
t.Errorf("failed testing new") t.Errorf("failed testing new")
} }
} }
func TestDropNonExistingOperation(t *testing.T) { func TestDropNonExistingOperation(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
defer shutdown(srv, wg) defer shutdown(srv)
op := OperationKey{ op := OperationKey{
Name: "drop-non-existing-operation-should-be-noop", Name: "drop-non-existing-operation-should-be-noop",
ResourceID: types.UID("uid"), ResourceID: types.UID("uid"),
@@ -97,9 +107,9 @@ func TestDropNonExistingOperation(t *testing.T) {
} }
func TestRecordMetricsForNonExistingOperation(t *testing.T) { func TestRecordMetricsForNonExistingOperation(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
srvAddr := "http://" + srv.Addr + httpPattern srvAddr := "http://" + srv.Addr + httpPattern
defer shutdown(srv, wg) defer shutdown(srv)
opKey := OperationKey{ opKey := OperationKey{
Name: "no-metrics-should-be-recorded-as-operation-did-not-start", Name: "no-metrics-should-be-recorded-as-operation-did-not-start",
ResourceID: types.UID("uid"), ResourceID: types.UID("uid"),
@@ -119,9 +129,9 @@ func TestRecordMetricsForNonExistingOperation(t *testing.T) {
} }
func TestDropOperation(t *testing.T) { func TestDropOperation(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
srvAddr := "http://" + srv.Addr + httpPattern srvAddr := "http://" + srv.Addr + httpPattern
defer shutdown(srv, wg) defer shutdown(srv)
opKey := OperationKey{ opKey := OperationKey{
Name: "should-have-been-dropped", Name: "should-have-been-dropped",
ResourceID: types.UID("uid"), ResourceID: types.UID("uid"),
@@ -176,9 +186,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver",operation
} }
func TestUnknownStatus(t *testing.T) { func TestUnknownStatus(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
srvAddr := "http://" + srv.Addr + httpPattern srvAddr := "http://" + srv.Addr + httpPattern
defer shutdown(srv, wg) defer shutdown(srv)
opKey := OperationKey{ opKey := OperationKey{
Name: "unknown-status-operation", Name: "unknown-status-operation",
ResourceID: types.UID("uid"), ResourceID: types.UID("uid"),
@@ -214,9 +224,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver",operation
} }
func TestRecordMetrics(t *testing.T) { func TestRecordMetrics(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
srvAddr := "http://" + srv.Addr + httpPattern srvAddr := "http://" + srv.Addr + httpPattern
defer shutdown(srv, wg) defer shutdown(srv)
// add an operation // add an operation
opKey := OperationKey{ opKey := OperationKey{
Name: "op1", Name: "op1",
@@ -284,9 +294,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver2",operatio
} }
func TestConcurrency(t *testing.T) { func TestConcurrency(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
srvAddr := "http://" + srv.Addr + httpPattern srvAddr := "http://" + srv.Addr + httpPattern
defer shutdown(srv, wg) defer shutdown(srv)
success := &fakeOpStatus{ success := &fakeOpStatus{
statusCode: 0, statusCode: 0,
} }
@@ -482,8 +492,8 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver5",operatio
func TestInFlightMetric(t *testing.T) { func TestInFlightMetric(t *testing.T) {
inFlightCheckInterval = time.Millisecond * 50 inFlightCheckInterval = time.Millisecond * 50
mgr, wg, srv := initMgr() mgr, srv := initMgr()
defer shutdown(srv, wg) defer shutdown(srv)
srvAddr := "http://" + srv.Addr + httpPattern srvAddr := "http://" + srv.Addr + httpPattern
// Start first operation, should be 1 // Start first operation, should be 1
@@ -710,8 +720,8 @@ func containsMetrics(expectedMfs, gotMfs []*cmg.MetricFamily) bool {
} }
func TestProcessStartTimeMetricExist(t *testing.T) { func TestProcessStartTimeMetricExist(t *testing.T) {
mgr, wg, srv := initMgr() mgr, srv := initMgr()
defer shutdown(srv, wg) defer shutdown(srv)
metricsFamilies, err := mgr.GetRegistry().Gather() metricsFamilies, err := mgr.GetRegistry().Gather()
if err != nil { if err != nil {
t.Fatalf("Error fetching metrics: %v", err) t.Fatalf("Error fetching metrics: %v", err)

View File

@@ -26,7 +26,7 @@ import (
storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1" storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v4/pkg/snapshotter" "github.com/kubernetes-csi/external-snapshotter/v4/pkg/snapshotter"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"