Add gauge metric for snapshot controller operations in flight

Signed-off-by: Grant Griffiths <grant@portworx.com>
This commit is contained in:
Grant Griffiths
2021-05-01 00:10:42 -04:00
committed by Grant Griffiths
parent e6e14c1601
commit 57987a84c0
2 changed files with 139 additions and 24 deletions

View File

@@ -37,6 +37,8 @@ const (
subSystem = "snapshot_controller" subSystem = "snapshot_controller"
operationLatencyMetricName = "operation_total_seconds" operationLatencyMetricName = "operation_total_seconds"
operationLatencyMetricHelpMsg = "Total number of seconds spent by the controller on an operation" operationLatencyMetricHelpMsg = "Total number of seconds spent by the controller on an operation"
operationInFlightName = "operations_in_flight"
operationInFlightHelpMsg = "Total number of operations in flight"
unknownDriverName = "unknown" unknownDriverName = "unknown"
// CreateSnapshotOperationName is the operation that tracks how long the controller takes to create a snapshot. // CreateSnapshotOperationName is the operation that tracks how long the controller takes to create a snapshot.
@@ -74,6 +76,10 @@ const (
SnapshotStatusTypeCancel snapshotStatusType = "cancel" SnapshotStatusTypeCancel snapshotStatusType = "cancel"
) )
var (
inFlightCheckInterval = 30 * time.Second
)
// OperationStatus is the interface type for representing an operation's execution // OperationStatus is the interface type for representing an operation's execution
// status, with the nil value representing an "Unknown" status of the operation. // status, with the nil value representing an "Unknown" status of the operation.
type OperationStatus interface { type OperationStatus interface {
@@ -152,19 +158,25 @@ type operationMetricsManager struct {
// ongoing operations. // ongoing operations.
// key is an Operation // key is an Operation
// value is the timestamp of the start time of the operation // value is the timestamp of the start time of the operation
cache sync.Map cache map[OperationKey]OperationValue
// mutex for protecting cache from concurrent access
mu sync.Mutex
// registry is a wrapper around Prometheus Registry // registry is a wrapper around Prometheus Registry
registry k8smetrics.KubeRegistry registry k8smetrics.KubeRegistry
// opLatencyMetrics is a Histogram metrics for operation time per request // opLatencyMetrics is a Histogram metrics for operation time per request
opLatencyMetrics *k8smetrics.HistogramVec opLatencyMetrics *k8smetrics.HistogramVec
// opInFlight is a Gauge metric for the number of operations in flight
opInFlight *k8smetrics.Gauge
} }
// NewMetricsManager creates a new MetricsManager instance // NewMetricsManager creates a new MetricsManager instance
func NewMetricsManager() MetricsManager { func NewMetricsManager() MetricsManager {
mgr := &operationMetricsManager{ mgr := &operationMetricsManager{
cache: sync.Map{}, cache: make(map[OperationKey]OperationValue),
} }
mgr.init() mgr.init()
return mgr return mgr
@@ -172,29 +184,33 @@ func NewMetricsManager() MetricsManager {
// OperationStart starts a new operation // OperationStart starts a new operation
func (opMgr *operationMetricsManager) OperationStart(key OperationKey, val OperationValue) { func (opMgr *operationMetricsManager) OperationStart(key OperationKey, val OperationValue) {
val.startTime = time.Now() opMgr.mu.Lock()
opMgr.cache.LoadOrStore(key, val) defer opMgr.mu.Unlock()
if _, exists := opMgr.cache[key]; !exists {
val.startTime = time.Now()
opMgr.cache[key] = val
}
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
} }
// OperationStart drops an operation // OperationStart drops an operation
func (opMgr *operationMetricsManager) DropOperation(op OperationKey) { func (opMgr *operationMetricsManager) DropOperation(op OperationKey) {
opMgr.cache.Delete(op) opMgr.mu.Lock()
defer opMgr.mu.Unlock()
delete(opMgr.cache, op)
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
} }
// RecordMetrics emits operation metrics // RecordMetrics emits operation metrics
func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus OperationStatus, driverName string) { func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus OperationStatus, driverName string) {
obj, exists := opMgr.cache.Load(opKey) opMgr.mu.Lock()
defer opMgr.mu.Unlock()
opVal, exists := opMgr.cache[opKey]
if !exists { if !exists {
// the operation has not been cached, return directly // the operation has not been cached, return directly
return return
} }
opVal, ok := obj.(OperationValue)
if !ok {
// the cached item is not a OperationValue, should NEVER happen, clean and return
klog.Errorf("Invalid cache entry for key %v", opKey)
opMgr.cache.Delete(opKey)
return
}
status := string(SnapshotStatusTypeUnknown) status := string(SnapshotStatusTypeUnknown)
if opStatus != nil { if opStatus != nil {
status = opStatus.String() status = opStatus.String()
@@ -213,7 +229,7 @@ func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus
if opKey.Name == DeleteSnapshotOperationName { if opKey.Name == DeleteSnapshotOperationName {
// check if we have a CreateSnapshot operation pending for this // check if we have a CreateSnapshot operation pending for this
createKey := NewOperationKey(CreateSnapshotOperationName, opKey.ResourceID) createKey := NewOperationKey(CreateSnapshotOperationName, opKey.ResourceID)
obj, exists := opMgr.cache.Load(createKey) obj, exists := opMgr.cache[createKey]
if exists { if exists {
// record a cancel metric if found // record a cancel metric if found
opMgr.recordCancelMetric(obj, createKey, operationDuration) opMgr.recordCancelMetric(obj, createKey, operationDuration)
@@ -221,32 +237,30 @@ func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus
// check if we have a CreateSnapshotAndReady operation pending for this // check if we have a CreateSnapshotAndReady operation pending for this
createAndReadyKey := NewOperationKey(CreateSnapshotAndReadyOperationName, opKey.ResourceID) createAndReadyKey := NewOperationKey(CreateSnapshotAndReadyOperationName, opKey.ResourceID)
obj, exists = opMgr.cache.Load(createAndReadyKey) obj, exists = opMgr.cache[createAndReadyKey]
if exists { if exists {
// record a cancel metric if found // record a cancel metric if found
opMgr.recordCancelMetric(obj, createAndReadyKey, operationDuration) opMgr.recordCancelMetric(obj, createAndReadyKey, operationDuration)
} }
} }
opMgr.cache.Delete(opKey) delete(opMgr.cache, opKey)
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
} }
// recordCancelMetric records a metric for a create operation that hasn't finished // recordCancelMetric records a metric for a create operation that hasn't finished
func (opMgr *operationMetricsManager) recordCancelMetric(obj interface{}, key OperationKey, duration float64) { func (opMgr *operationMetricsManager) recordCancelMetric(val OperationValue, key OperationKey, duration float64) {
opMgr.mu.Lock()
defer opMgr.mu.Unlock()
// record a cancel metric if found // record a cancel metric if found
val, ok := obj.(OperationValue)
if !ok {
klog.Errorf("Invalid cache entry for key %v", key)
opMgr.cache.Delete(key)
return
}
opMgr.opLatencyMetrics.WithLabelValues( opMgr.opLatencyMetrics.WithLabelValues(
val.Driver, val.Driver,
key.Name, key.Name,
val.SnapshotType, val.SnapshotType,
string(SnapshotStatusTypeCancel), string(SnapshotStatusTypeCancel),
).Observe(duration) ).Observe(duration)
opMgr.cache.Delete(key) delete(opMgr.cache, key)
} }
func (opMgr *operationMetricsManager) init() { func (opMgr *operationMetricsManager) init() {
@@ -261,6 +275,29 @@ func (opMgr *operationMetricsManager) init() {
[]string{labelDriverName, labelOperationName, labelSnapshotType, labelOperationStatus}, []string{labelDriverName, labelOperationName, labelSnapshotType, labelOperationStatus},
) )
opMgr.registry.MustRegister(opMgr.opLatencyMetrics) opMgr.registry.MustRegister(opMgr.opLatencyMetrics)
opMgr.opInFlight = k8smetrics.NewGauge(
&k8smetrics.GaugeOpts{
Subsystem: subSystem,
Name: operationInFlightName,
Help: operationInFlightHelpMsg,
},
)
opMgr.registry.MustRegister(opMgr.opInFlight)
// While we always maintain the number of operations in flight
// for every metrics operation start/finish, if any are leaked,
// this scheduled routine will catch any leaked operations.
go opMgr.scheduleOpsInFlightMetric()
}
func (opMgr *operationMetricsManager) scheduleOpsInFlightMetric() {
for range time.Tick(inFlightCheckInterval) {
func() {
opMgr.mu.Lock()
defer opMgr.mu.Unlock()
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
}()
}
} }
func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) { func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) {

View File

@@ -478,6 +478,83 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver5",operatio
} }
} }
func TestInFlightMetric(t *testing.T) {
inFlightCheckInterval = time.Millisecond * 50
mgr, wg, srv := initMgr()
defer shutdown(srv, wg)
srvAddr := "http://" + srv.Addr + httpPattern
// Start first operation, should be 1
opKey := OperationKey{
Name: "leaked",
ResourceID: types.UID("uid"),
}
opVal := NewOperationValue("driver", "test")
mgr.OperationStart(opKey, opVal)
time.Sleep(500 * time.Millisecond)
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 1`, srvAddr); err != nil {
t.Errorf("failed testing [%v]", err)
}
// Start second operation, should be 2
opKey = OperationKey{
Name: "leaked2",
ResourceID: types.UID("uid"),
}
opVal = NewOperationValue("driver2", "test2")
mgr.OperationStart(opKey, opVal)
time.Sleep(500 * time.Millisecond)
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 2`, srvAddr); err != nil {
t.Errorf("failed testing [%v]", err)
}
// Record, should be down to 1
mgr.RecordMetrics(opKey, nil, "driver")
time.Sleep(500 * time.Millisecond)
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 1`, srvAddr); err != nil {
t.Errorf("failed testing [%v]", err)
}
// Start 50 operations, should be 51
for i := 0; i < 50; i++ {
opKey := OperationKey{
Name: fmt.Sprintf("op%d", i),
ResourceID: types.UID("uid%d"),
}
opVal := NewOperationValue("driver1", "test")
mgr.OperationStart(opKey, opVal)
}
time.Sleep(500 * time.Millisecond)
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 51`, srvAddr); err != nil {
t.Errorf("failed testing [%v]", err)
}
}
func verifyInFlightMetric(expected string, srvAddr string) error {
rsp, err := http.Get(srvAddr)
if err != nil {
return err
}
if rsp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to get response from serve: %s", http.StatusText(rsp.StatusCode))
}
r, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
if !strings.Contains(string(r), expected) {
return fmt.Errorf("failed, not equal")
}
return nil
}
func verifyMetric(expected, srvAddr string) error { func verifyMetric(expected, srvAddr string) error {
rsp, err := http.Get(srvAddr) rsp, err := http.Get(srvAddr)
if err != nil { if err != nil {
@@ -490,6 +567,7 @@ func verifyMetric(expected, srvAddr string) error {
if err != nil { if err != nil {
return err return err
} }
format := expfmt.ResponseFormat(rsp.Header) format := expfmt.ResponseFormat(rsp.Header)
gotReader := strings.NewReader(string(r)) gotReader := strings.NewReader(string(r))
gotDecoder := expfmt.NewDecoder(gotReader, format) gotDecoder := expfmt.NewDecoder(gotReader, format)