diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index b1f11ed0..84adc67b 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -37,6 +37,8 @@ const ( subSystem = "snapshot_controller" operationLatencyMetricName = "operation_total_seconds" operationLatencyMetricHelpMsg = "Total number of seconds spent by the controller on an operation" + operationInFlightName = "operations_in_flight" + operationInFlightHelpMsg = "Total number of operations in flight" unknownDriverName = "unknown" // CreateSnapshotOperationName is the operation that tracks how long the controller takes to create a snapshot. @@ -74,6 +76,10 @@ const ( SnapshotStatusTypeCancel snapshotStatusType = "cancel" ) +var ( + inFlightCheckInterval = 30 * time.Second +) + // OperationStatus is the interface type for representing an operation's execution // status, with the nil value representing an "Unknown" status of the operation. type OperationStatus interface { @@ -152,19 +158,25 @@ type operationMetricsManager struct { // ongoing operations. // key is an Operation // value is the timestamp of the start time of the operation - cache sync.Map + cache map[OperationKey]OperationValue + + // mutex for protecting cache from concurrent access + mu sync.Mutex // registry is a wrapper around Prometheus Registry registry k8smetrics.KubeRegistry // opLatencyMetrics is a Histogram metrics for operation time per request opLatencyMetrics *k8smetrics.HistogramVec + + // opInFlight is a Gauge metric for the number of operations in flight + opInFlight *k8smetrics.Gauge } // NewMetricsManager creates a new MetricsManager instance func NewMetricsManager() MetricsManager { mgr := &operationMetricsManager{ - cache: sync.Map{}, + cache: make(map[OperationKey]OperationValue), } mgr.init() return mgr @@ -172,29 +184,33 @@ func NewMetricsManager() MetricsManager { // OperationStart starts a new operation func (opMgr *operationMetricsManager) OperationStart(key OperationKey, val OperationValue) { - val.startTime = time.Now() - opMgr.cache.LoadOrStore(key, val) + opMgr.mu.Lock() + defer opMgr.mu.Unlock() + + if _, exists := opMgr.cache[key]; !exists { + val.startTime = time.Now() + opMgr.cache[key] = val + } + opMgr.opInFlight.Set(float64(len(opMgr.cache))) } // OperationStart drops an operation func (opMgr *operationMetricsManager) DropOperation(op OperationKey) { - opMgr.cache.Delete(op) + opMgr.mu.Lock() + defer opMgr.mu.Unlock() + delete(opMgr.cache, op) + opMgr.opInFlight.Set(float64(len(opMgr.cache))) } // RecordMetrics emits operation metrics func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus OperationStatus, driverName string) { - obj, exists := opMgr.cache.Load(opKey) + opMgr.mu.Lock() + defer opMgr.mu.Unlock() + opVal, exists := opMgr.cache[opKey] if !exists { // the operation has not been cached, return directly return } - opVal, ok := obj.(OperationValue) - if !ok { - // the cached item is not a OperationValue, should NEVER happen, clean and return - klog.Errorf("Invalid cache entry for key %v", opKey) - opMgr.cache.Delete(opKey) - return - } status := string(SnapshotStatusTypeUnknown) if opStatus != nil { status = opStatus.String() @@ -213,7 +229,7 @@ func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus if opKey.Name == DeleteSnapshotOperationName { // check if we have a CreateSnapshot operation pending for this createKey := NewOperationKey(CreateSnapshotOperationName, opKey.ResourceID) - obj, exists := opMgr.cache.Load(createKey) + obj, exists := opMgr.cache[createKey] if exists { // record a cancel metric if found opMgr.recordCancelMetric(obj, createKey, operationDuration) @@ -221,32 +237,30 @@ func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus // check if we have a CreateSnapshotAndReady operation pending for this createAndReadyKey := NewOperationKey(CreateSnapshotAndReadyOperationName, opKey.ResourceID) - obj, exists = opMgr.cache.Load(createAndReadyKey) + obj, exists = opMgr.cache[createAndReadyKey] if exists { // record a cancel metric if found opMgr.recordCancelMetric(obj, createAndReadyKey, operationDuration) } } - opMgr.cache.Delete(opKey) + delete(opMgr.cache, opKey) + opMgr.opInFlight.Set(float64(len(opMgr.cache))) } // recordCancelMetric records a metric for a create operation that hasn't finished -func (opMgr *operationMetricsManager) recordCancelMetric(obj interface{}, key OperationKey, duration float64) { +func (opMgr *operationMetricsManager) recordCancelMetric(val OperationValue, key OperationKey, duration float64) { + opMgr.mu.Lock() + defer opMgr.mu.Unlock() // record a cancel metric if found - val, ok := obj.(OperationValue) - if !ok { - klog.Errorf("Invalid cache entry for key %v", key) - opMgr.cache.Delete(key) - return - } + opMgr.opLatencyMetrics.WithLabelValues( val.Driver, key.Name, val.SnapshotType, string(SnapshotStatusTypeCancel), ).Observe(duration) - opMgr.cache.Delete(key) + delete(opMgr.cache, key) } func (opMgr *operationMetricsManager) init() { @@ -261,6 +275,29 @@ func (opMgr *operationMetricsManager) init() { []string{labelDriverName, labelOperationName, labelSnapshotType, labelOperationStatus}, ) opMgr.registry.MustRegister(opMgr.opLatencyMetrics) + opMgr.opInFlight = k8smetrics.NewGauge( + &k8smetrics.GaugeOpts{ + Subsystem: subSystem, + Name: operationInFlightName, + Help: operationInFlightHelpMsg, + }, + ) + opMgr.registry.MustRegister(opMgr.opInFlight) + + // While we always maintain the number of operations in flight + // for every metrics operation start/finish, if any are leaked, + // this scheduled routine will catch any leaked operations. + go opMgr.scheduleOpsInFlightMetric() +} + +func (opMgr *operationMetricsManager) scheduleOpsInFlightMetric() { + for range time.Tick(inFlightCheckInterval) { + func() { + opMgr.mu.Lock() + defer opMgr.mu.Unlock() + opMgr.opInFlight.Set(float64(len(opMgr.cache))) + }() + } } func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) { diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index aa766fb1..e19892da 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -478,6 +478,83 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver5",operatio } } +func TestInFlightMetric(t *testing.T) { + inFlightCheckInterval = time.Millisecond * 50 + + mgr, wg, srv := initMgr() + defer shutdown(srv, wg) + srvAddr := "http://" + srv.Addr + httpPattern + + // Start first operation, should be 1 + opKey := OperationKey{ + Name: "leaked", + ResourceID: types.UID("uid"), + } + opVal := NewOperationValue("driver", "test") + mgr.OperationStart(opKey, opVal) + time.Sleep(500 * time.Millisecond) + + if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 1`, srvAddr); err != nil { + t.Errorf("failed testing [%v]", err) + } + + // Start second operation, should be 2 + opKey = OperationKey{ + Name: "leaked2", + ResourceID: types.UID("uid"), + } + opVal = NewOperationValue("driver2", "test2") + mgr.OperationStart(opKey, opVal) + time.Sleep(500 * time.Millisecond) + + if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 2`, srvAddr); err != nil { + t.Errorf("failed testing [%v]", err) + } + + // Record, should be down to 1 + mgr.RecordMetrics(opKey, nil, "driver") + time.Sleep(500 * time.Millisecond) + + if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 1`, srvAddr); err != nil { + t.Errorf("failed testing [%v]", err) + } + + // Start 50 operations, should be 51 + for i := 0; i < 50; i++ { + opKey := OperationKey{ + Name: fmt.Sprintf("op%d", i), + ResourceID: types.UID("uid%d"), + } + opVal := NewOperationValue("driver1", "test") + mgr.OperationStart(opKey, opVal) + } + time.Sleep(500 * time.Millisecond) + + if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 51`, srvAddr); err != nil { + t.Errorf("failed testing [%v]", err) + } +} + +func verifyInFlightMetric(expected string, srvAddr string) error { + rsp, err := http.Get(srvAddr) + if err != nil { + return err + } + if rsp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to get response from serve: %s", http.StatusText(rsp.StatusCode)) + } + r, err := ioutil.ReadAll(rsp.Body) + if err != nil { + return err + } + + if !strings.Contains(string(r), expected) { + return fmt.Errorf("failed, not equal") + } + + return nil +} + func verifyMetric(expected, srvAddr string) error { rsp, err := http.Get(srvAddr) if err != nil { @@ -490,6 +567,7 @@ func verifyMetric(expected, srvAddr string) error { if err != nil { return err } + format := expfmt.ResponseFormat(rsp.Header) gotReader := strings.NewReader(string(r)) gotDecoder := expfmt.NewDecoder(gotReader, format)