Merge pull request #519 from ggriffiths/metric_operation_leak
Add gauge metric for snapshot controller operations in flight
This commit is contained in:
@@ -37,6 +37,8 @@ const (
|
||||
subSystem = "snapshot_controller"
|
||||
operationLatencyMetricName = "operation_total_seconds"
|
||||
operationLatencyMetricHelpMsg = "Total number of seconds spent by the controller on an operation"
|
||||
operationInFlightName = "operations_in_flight"
|
||||
operationInFlightHelpMsg = "Total number of operations in flight"
|
||||
unknownDriverName = "unknown"
|
||||
|
||||
// CreateSnapshotOperationName is the operation that tracks how long the controller takes to create a snapshot.
|
||||
@@ -74,6 +76,10 @@ const (
|
||||
SnapshotStatusTypeCancel snapshotStatusType = "cancel"
|
||||
)
|
||||
|
||||
var (
|
||||
inFlightCheckInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
// OperationStatus is the interface type for representing an operation's execution
|
||||
// status, with the nil value representing an "Unknown" status of the operation.
|
||||
type OperationStatus interface {
|
||||
@@ -152,19 +158,25 @@ type operationMetricsManager struct {
|
||||
// ongoing operations.
|
||||
// key is an Operation
|
||||
// value is the timestamp of the start time of the operation
|
||||
cache sync.Map
|
||||
cache map[OperationKey]OperationValue
|
||||
|
||||
// mutex for protecting cache from concurrent access
|
||||
mu sync.Mutex
|
||||
|
||||
// registry is a wrapper around Prometheus Registry
|
||||
registry k8smetrics.KubeRegistry
|
||||
|
||||
// opLatencyMetrics is a Histogram metrics for operation time per request
|
||||
opLatencyMetrics *k8smetrics.HistogramVec
|
||||
|
||||
// opInFlight is a Gauge metric for the number of operations in flight
|
||||
opInFlight *k8smetrics.Gauge
|
||||
}
|
||||
|
||||
// NewMetricsManager creates a new MetricsManager instance
|
||||
func NewMetricsManager() MetricsManager {
|
||||
mgr := &operationMetricsManager{
|
||||
cache: sync.Map{},
|
||||
cache: make(map[OperationKey]OperationValue),
|
||||
}
|
||||
mgr.init()
|
||||
return mgr
|
||||
@@ -172,29 +184,33 @@ func NewMetricsManager() MetricsManager {
|
||||
|
||||
// OperationStart starts a new operation
|
||||
func (opMgr *operationMetricsManager) OperationStart(key OperationKey, val OperationValue) {
|
||||
val.startTime = time.Now()
|
||||
opMgr.cache.LoadOrStore(key, val)
|
||||
opMgr.mu.Lock()
|
||||
defer opMgr.mu.Unlock()
|
||||
|
||||
if _, exists := opMgr.cache[key]; !exists {
|
||||
val.startTime = time.Now()
|
||||
opMgr.cache[key] = val
|
||||
}
|
||||
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
|
||||
}
|
||||
|
||||
// OperationStart drops an operation
|
||||
func (opMgr *operationMetricsManager) DropOperation(op OperationKey) {
|
||||
opMgr.cache.Delete(op)
|
||||
opMgr.mu.Lock()
|
||||
defer opMgr.mu.Unlock()
|
||||
delete(opMgr.cache, op)
|
||||
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
|
||||
}
|
||||
|
||||
// RecordMetrics emits operation metrics
|
||||
func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus OperationStatus, driverName string) {
|
||||
obj, exists := opMgr.cache.Load(opKey)
|
||||
opMgr.mu.Lock()
|
||||
defer opMgr.mu.Unlock()
|
||||
opVal, exists := opMgr.cache[opKey]
|
||||
if !exists {
|
||||
// the operation has not been cached, return directly
|
||||
return
|
||||
}
|
||||
opVal, ok := obj.(OperationValue)
|
||||
if !ok {
|
||||
// the cached item is not a OperationValue, should NEVER happen, clean and return
|
||||
klog.Errorf("Invalid cache entry for key %v", opKey)
|
||||
opMgr.cache.Delete(opKey)
|
||||
return
|
||||
}
|
||||
status := string(SnapshotStatusTypeUnknown)
|
||||
if opStatus != nil {
|
||||
status = opStatus.String()
|
||||
@@ -213,7 +229,7 @@ func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus
|
||||
if opKey.Name == DeleteSnapshotOperationName {
|
||||
// check if we have a CreateSnapshot operation pending for this
|
||||
createKey := NewOperationKey(CreateSnapshotOperationName, opKey.ResourceID)
|
||||
obj, exists := opMgr.cache.Load(createKey)
|
||||
obj, exists := opMgr.cache[createKey]
|
||||
if exists {
|
||||
// record a cancel metric if found
|
||||
opMgr.recordCancelMetric(obj, createKey, operationDuration)
|
||||
@@ -221,32 +237,30 @@ func (opMgr *operationMetricsManager) RecordMetrics(opKey OperationKey, opStatus
|
||||
|
||||
// check if we have a CreateSnapshotAndReady operation pending for this
|
||||
createAndReadyKey := NewOperationKey(CreateSnapshotAndReadyOperationName, opKey.ResourceID)
|
||||
obj, exists = opMgr.cache.Load(createAndReadyKey)
|
||||
obj, exists = opMgr.cache[createAndReadyKey]
|
||||
if exists {
|
||||
// record a cancel metric if found
|
||||
opMgr.recordCancelMetric(obj, createAndReadyKey, operationDuration)
|
||||
}
|
||||
}
|
||||
|
||||
opMgr.cache.Delete(opKey)
|
||||
delete(opMgr.cache, opKey)
|
||||
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
|
||||
}
|
||||
|
||||
// recordCancelMetric records a metric for a create operation that hasn't finished
|
||||
func (opMgr *operationMetricsManager) recordCancelMetric(obj interface{}, key OperationKey, duration float64) {
|
||||
func (opMgr *operationMetricsManager) recordCancelMetric(val OperationValue, key OperationKey, duration float64) {
|
||||
opMgr.mu.Lock()
|
||||
defer opMgr.mu.Unlock()
|
||||
// record a cancel metric if found
|
||||
val, ok := obj.(OperationValue)
|
||||
if !ok {
|
||||
klog.Errorf("Invalid cache entry for key %v", key)
|
||||
opMgr.cache.Delete(key)
|
||||
return
|
||||
}
|
||||
|
||||
opMgr.opLatencyMetrics.WithLabelValues(
|
||||
val.Driver,
|
||||
key.Name,
|
||||
val.SnapshotType,
|
||||
string(SnapshotStatusTypeCancel),
|
||||
).Observe(duration)
|
||||
opMgr.cache.Delete(key)
|
||||
delete(opMgr.cache, key)
|
||||
}
|
||||
|
||||
func (opMgr *operationMetricsManager) init() {
|
||||
@@ -261,6 +275,29 @@ func (opMgr *operationMetricsManager) init() {
|
||||
[]string{labelDriverName, labelOperationName, labelSnapshotType, labelOperationStatus},
|
||||
)
|
||||
opMgr.registry.MustRegister(opMgr.opLatencyMetrics)
|
||||
opMgr.opInFlight = k8smetrics.NewGauge(
|
||||
&k8smetrics.GaugeOpts{
|
||||
Subsystem: subSystem,
|
||||
Name: operationInFlightName,
|
||||
Help: operationInFlightHelpMsg,
|
||||
},
|
||||
)
|
||||
opMgr.registry.MustRegister(opMgr.opInFlight)
|
||||
|
||||
// While we always maintain the number of operations in flight
|
||||
// for every metrics operation start/finish, if any are leaked,
|
||||
// this scheduled routine will catch any leaked operations.
|
||||
go opMgr.scheduleOpsInFlightMetric()
|
||||
}
|
||||
|
||||
func (opMgr *operationMetricsManager) scheduleOpsInFlightMetric() {
|
||||
for range time.Tick(inFlightCheckInterval) {
|
||||
func() {
|
||||
opMgr.mu.Lock()
|
||||
defer opMgr.mu.Unlock()
|
||||
opMgr.opInFlight.Set(float64(len(opMgr.cache)))
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) {
|
||||
|
@@ -478,6 +478,83 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver5",operatio
|
||||
}
|
||||
}
|
||||
|
||||
func TestInFlightMetric(t *testing.T) {
|
||||
inFlightCheckInterval = time.Millisecond * 50
|
||||
|
||||
mgr, wg, srv := initMgr()
|
||||
defer shutdown(srv, wg)
|
||||
srvAddr := "http://" + srv.Addr + httpPattern
|
||||
|
||||
// Start first operation, should be 1
|
||||
opKey := OperationKey{
|
||||
Name: "leaked",
|
||||
ResourceID: types.UID("uid"),
|
||||
}
|
||||
opVal := NewOperationValue("driver", "test")
|
||||
mgr.OperationStart(opKey, opVal)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 1`, srvAddr); err != nil {
|
||||
t.Errorf("failed testing [%v]", err)
|
||||
}
|
||||
|
||||
// Start second operation, should be 2
|
||||
opKey = OperationKey{
|
||||
Name: "leaked2",
|
||||
ResourceID: types.UID("uid"),
|
||||
}
|
||||
opVal = NewOperationValue("driver2", "test2")
|
||||
mgr.OperationStart(opKey, opVal)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 2`, srvAddr); err != nil {
|
||||
t.Errorf("failed testing [%v]", err)
|
||||
}
|
||||
|
||||
// Record, should be down to 1
|
||||
mgr.RecordMetrics(opKey, nil, "driver")
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 1`, srvAddr); err != nil {
|
||||
t.Errorf("failed testing [%v]", err)
|
||||
}
|
||||
|
||||
// Start 50 operations, should be 51
|
||||
for i := 0; i < 50; i++ {
|
||||
opKey := OperationKey{
|
||||
Name: fmt.Sprintf("op%d", i),
|
||||
ResourceID: types.UID("uid%d"),
|
||||
}
|
||||
opVal := NewOperationValue("driver1", "test")
|
||||
mgr.OperationStart(opKey, opVal)
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if err := verifyInFlightMetric(`snapshot_controller_operations_in_flight 51`, srvAddr); err != nil {
|
||||
t.Errorf("failed testing [%v]", err)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyInFlightMetric(expected string, srvAddr string) error {
|
||||
rsp, err := http.Get(srvAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rsp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("failed to get response from serve: %s", http.StatusText(rsp.StatusCode))
|
||||
}
|
||||
r, err := ioutil.ReadAll(rsp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !strings.Contains(string(r), expected) {
|
||||
return fmt.Errorf("failed, not equal")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyMetric(expected, srvAddr string) error {
|
||||
rsp, err := http.Get(srvAddr)
|
||||
if err != nil {
|
||||
@@ -490,6 +567,7 @@ func verifyMetric(expected, srvAddr string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
format := expfmt.ResponseFormat(rsp.Header)
|
||||
gotReader := strings.NewReader(string(r))
|
||||
gotDecoder := expfmt.NewDecoder(gotReader, format)
|
||||
|
Reference in New Issue
Block a user