Bumping k8s version to 1.13.0-beta.1

This commit is contained in:
Cheng Xing
2018-11-19 14:10:50 -08:00
parent 01bd7f356e
commit e2f1bdc372
633 changed files with 11189 additions and 126194 deletions

View File

@@ -260,34 +260,6 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
return certBuffer.Bytes(), keyBuffer.Bytes(), nil
}
// FormatBytesCert receives byte array certificate and formats in human-readable format
func FormatBytesCert(cert []byte) (string, error) {
block, _ := pem.Decode(cert)
c, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return "", fmt.Errorf("failed to parse certificate [%v]", err)
}
return FormatCert(c), nil
}
// FormatCert receives certificate and formats in human-readable format
func FormatCert(c *x509.Certificate) string {
var ips []string
for _, ip := range c.IPAddresses {
ips = append(ips, ip.String())
}
altNames := append(ips, c.DNSNames...)
res := fmt.Sprintf(
"Issuer: CN=%s | Subject: CN=%s | CA: %t\n",
c.Issuer.CommonName, c.Subject.CommonName, c.IsCA,
)
res += fmt.Sprintf("Not before: %s Not After: %s", c.NotBefore, c.NotAfter)
if len(altNames) > 0 {
res += fmt.Sprintf("\nAlternate Names: %v", altNames)
}
return res
}
func ipsToStrings(ips []net.IP) []string {
ss := make([]string, 0, len(ips))
for _, ip := range ips {

View File

@@ -1,116 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package triple generates key-certificate pairs for the
// triple (CA, Server, Client).
package triple
import (
"crypto/rsa"
"crypto/x509"
"fmt"
"net"
certutil "k8s.io/client-go/util/cert"
)
type KeyPair struct {
Key *rsa.PrivateKey
Cert *x509.Certificate
}
func NewCA(name string) (*KeyPair, error) {
key, err := certutil.NewPrivateKey()
if err != nil {
return nil, fmt.Errorf("unable to create a private key for a new CA: %v", err)
}
config := certutil.Config{
CommonName: name,
}
cert, err := certutil.NewSelfSignedCACert(config, key)
if err != nil {
return nil, fmt.Errorf("unable to create a self-signed certificate for a new CA: %v", err)
}
return &KeyPair{
Key: key,
Cert: cert,
}, nil
}
func NewServerKeyPair(ca *KeyPair, commonName, svcName, svcNamespace, dnsDomain string, ips, hostnames []string) (*KeyPair, error) {
key, err := certutil.NewPrivateKey()
if err != nil {
return nil, fmt.Errorf("unable to create a server private key: %v", err)
}
namespacedName := fmt.Sprintf("%s.%s", svcName, svcNamespace)
internalAPIServerFQDN := []string{
svcName,
namespacedName,
fmt.Sprintf("%s.svc", namespacedName),
fmt.Sprintf("%s.svc.%s", namespacedName, dnsDomain),
}
altNames := certutil.AltNames{}
for _, ipStr := range ips {
ip := net.ParseIP(ipStr)
if ip != nil {
altNames.IPs = append(altNames.IPs, ip)
}
}
altNames.DNSNames = append(altNames.DNSNames, hostnames...)
altNames.DNSNames = append(altNames.DNSNames, internalAPIServerFQDN...)
config := certutil.Config{
CommonName: commonName,
AltNames: altNames,
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}
cert, err := certutil.NewSignedCert(config, key, ca.Cert, ca.Key)
if err != nil {
return nil, fmt.Errorf("unable to sign the server certificate: %v", err)
}
return &KeyPair{
Key: key,
Cert: cert,
}, nil
}
func NewClientKeyPair(ca *KeyPair, commonName string, organizations []string) (*KeyPair, error) {
key, err := certutil.NewPrivateKey()
if err != nil {
return nil, fmt.Errorf("unable to create a client private key: %v", err)
}
config := certutil.Config{
CommonName: commonName,
Organization: organizations,
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
}
cert, err := certutil.NewSignedCert(config, key, ca.Cert, ca.Key)
if err != nil {
return nil, fmt.Errorf("unable to sign the client certificate: %v", err)
}
return &KeyPair{
Key: key,
Cert: cert,
}, nil
}

View File

@@ -28,7 +28,7 @@ import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/klog"
certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -227,17 +227,17 @@ func (m *manager) Start() {
// signing API, so don't start the certificate manager if we don't have a
// client.
if m.certSigningRequestClient == nil {
glog.V(2).Infof("Certificate rotation is not enabled, no connection to the apiserver.")
klog.V(2).Infof("Certificate rotation is not enabled, no connection to the apiserver.")
return
}
glog.V(2).Infof("Certificate rotation is enabled.")
klog.V(2).Infof("Certificate rotation is enabled.")
templateChanged := make(chan struct{})
go wait.Forever(func() {
deadline := m.nextRotationDeadline()
if sleepInterval := deadline.Sub(time.Now()); sleepInterval > 0 {
glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
klog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
timer := time.NewTimer(sleepInterval)
defer timer.Stop()
@@ -250,7 +250,7 @@ func (m *manager) Start() {
// if the template now matches what we last requested, restart the rotation deadline loop
return
}
glog.V(2).Infof("Certificate template changed, rotating")
klog.V(2).Infof("Certificate template changed, rotating")
}
}
@@ -321,7 +321,7 @@ func getCurrentCertificateOrBootstrap(
if _, err := store.Update(bootstrapCertificatePEM, bootstrapKeyPEM); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to set the cert/key pair to the bootstrap certificate: %v", err))
} else {
glog.V(4).Infof("Updated the store to contain the initial bootstrap certificate")
klog.V(4).Infof("Updated the store to contain the initial bootstrap certificate")
}
return &bootstrapCert, true, nil
@@ -333,7 +333,7 @@ func getCurrentCertificateOrBootstrap(
// This method also keeps track of "server health" by interpreting the responses it gets
// from the server on the various calls it makes.
func (m *manager) rotateCerts() (bool, error) {
glog.V(2).Infof("Rotating certificates")
klog.V(2).Infof("Rotating certificates")
template, csrPEM, keyPEM, privateKey, err := m.generateCSR()
if err != nil {
@@ -403,7 +403,7 @@ func (m *manager) certSatisfiesTemplateLocked() bool {
if template := m.getTemplate(); template != nil {
if template.Subject.CommonName != m.cert.Leaf.Subject.CommonName {
glog.V(2).Infof("Current certificate CN (%s) does not match requested CN (%s)", m.cert.Leaf.Subject.CommonName, template.Subject.CommonName)
klog.V(2).Infof("Current certificate CN (%s) does not match requested CN (%s)", m.cert.Leaf.Subject.CommonName, template.Subject.CommonName)
return false
}
@@ -411,7 +411,7 @@ func (m *manager) certSatisfiesTemplateLocked() bool {
desiredDNSNames := sets.NewString(template.DNSNames...)
missingDNSNames := desiredDNSNames.Difference(currentDNSNames)
if len(missingDNSNames) > 0 {
glog.V(2).Infof("Current certificate is missing requested DNS names %v", missingDNSNames.List())
klog.V(2).Infof("Current certificate is missing requested DNS names %v", missingDNSNames.List())
return false
}
@@ -425,7 +425,7 @@ func (m *manager) certSatisfiesTemplateLocked() bool {
}
missingIPs := desiredIPs.Difference(currentIPs)
if len(missingIPs) > 0 {
glog.V(2).Infof("Current certificate is missing requested IP addresses %v", missingIPs.List())
klog.V(2).Infof("Current certificate is missing requested IP addresses %v", missingIPs.List())
return false
}
@@ -433,7 +433,7 @@ func (m *manager) certSatisfiesTemplateLocked() bool {
desiredOrgs := sets.NewString(template.Subject.Organization...)
missingOrgs := desiredOrgs.Difference(currentOrgs)
if len(missingOrgs) > 0 {
glog.V(2).Infof("Current certificate is missing requested orgs %v", missingOrgs.List())
klog.V(2).Infof("Current certificate is missing requested orgs %v", missingOrgs.List())
return false
}
}
@@ -468,7 +468,7 @@ func (m *manager) nextRotationDeadline() time.Time {
totalDuration := float64(notAfter.Sub(m.cert.Leaf.NotBefore))
deadline := m.cert.Leaf.NotBefore.Add(jitteryDuration(totalDuration))
glog.V(2).Infof("Certificate expiration is %v, rotation deadline is %v", notAfter, deadline)
klog.V(2).Infof("Certificate expiration is %v, rotation deadline is %v", notAfter, deadline)
if m.certificateExpiration != nil {
m.certificateExpiration.Set(float64(notAfter.Unix()))
}

View File

@@ -26,7 +26,7 @@ import (
"strings"
"time"
"github.com/golang/glog"
"k8s.io/klog"
)
const (
@@ -127,7 +127,7 @@ func (s *fileStore) Current() (*tls.Certificate, error) {
if pairFileExists, err := fileExists(pairFile); err != nil {
return nil, err
} else if pairFileExists {
glog.Infof("Loading cert/key pair from %q.", pairFile)
klog.Infof("Loading cert/key pair from %q.", pairFile)
return loadFile(pairFile)
}
@@ -140,7 +140,7 @@ func (s *fileStore) Current() (*tls.Certificate, error) {
return nil, err
}
if certFileExists && keyFileExists {
glog.Infof("Loading cert/key pair from (%q, %q).", s.certFile, s.keyFile)
klog.Infof("Loading cert/key pair from (%q, %q).", s.certFile, s.keyFile)
return loadX509KeyPair(s.certFile, s.keyFile)
}
@@ -155,7 +155,7 @@ func (s *fileStore) Current() (*tls.Certificate, error) {
return nil, err
}
if certFileExists && keyFileExists {
glog.Infof("Loading cert/key pair from (%q, %q).", c, k)
klog.Infof("Loading cert/key pair from (%q, %q).", c, k)
return loadX509KeyPair(c, k)
}

View File

@@ -18,23 +18,19 @@ package csr
import (
"crypto"
"crypto/sha512"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"fmt"
"reflect"
"time"
"github.com/golang/glog"
"k8s.io/klog"
certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
@@ -43,41 +39,6 @@ import (
certutil "k8s.io/client-go/util/cert"
)
// RequestNodeCertificate will create a certificate signing request for a node
// (Organization and CommonName for the CSR will be set as expected for node
// certificates) and send it to API server, then it will watch the object's
// status, once approved by API server, it will return the API server's issued
// certificate (pem-encoded). If there is any errors, or the watch timeouts, it
// will return an error. This is intended for use on nodes (kubelet and
// kubeadm).
func RequestNodeCertificate(client certificatesclient.CertificateSigningRequestInterface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) {
subject := &pkix.Name{
Organization: []string{"system:nodes"},
CommonName: "system:node:" + string(nodeName),
}
privateKey, err := certutil.ParsePrivateKeyPEM(privateKeyData)
if err != nil {
return nil, fmt.Errorf("invalid private key for certificate request: %v", err)
}
csrData, err := certutil.MakeCSR(privateKey, subject, nil, nil)
if err != nil {
return nil, fmt.Errorf("unable to generate certificate request: %v", err)
}
usages := []certificates.KeyUsage{
certificates.UsageDigitalSignature,
certificates.UsageKeyEncipherment,
certificates.UsageClientAuth,
}
name := digestedName(privateKeyData, subject, usages)
req, err := RequestCertificate(client, csrData, name, usages, privateKey)
if err != nil {
return nil, err
}
return WaitForCertificate(client, req, 3600*time.Second)
}
// RequestCertificate will either use an existing (if this process has run
// before but not to completion) or create a certificate signing request using the
// PEM encoded CSR and send it to API server, then it will watch the object's
@@ -104,7 +65,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
switch {
case err == nil:
case errors.IsAlreadyExists(err) && len(name) > 0:
glog.Infof("csr for this node already exists, reusing")
klog.Infof("csr for this node already exists, reusing")
req, err = client.Get(name, metav1.GetOptions{})
if err != nil {
return nil, formatError("cannot retrieve certificate signing request: %v", err)
@@ -112,7 +73,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
if err := ensureCompatible(req, csr, privateKey); err != nil {
return nil, fmt.Errorf("retrieved csr is not compatible: %v", err)
}
glog.Infof("csr for this node is still valid")
klog.Infof("csr for this node is still valid")
default:
return nil, formatError("cannot create certificate signing request: %v", err)
}
@@ -168,57 +129,25 @@ func WaitForCertificate(client certificatesclient.CertificateSigningRequestInter
return event.Object.(*certificates.CertificateSigningRequest).Status.Certificate, nil
}
// This digest should include all the relevant pieces of the CSR we care about.
// We can't direcly hash the serialized CSR because of random padding that we
// regenerate every loop and we include usages which are not contained in the
// CSR. This needs to be kept up to date as we add new fields to the node
// certificates and with ensureCompatible.
func digestedName(privateKeyData []byte, subject *pkix.Name, usages []certificates.KeyUsage) string {
hash := sha512.New512_256()
// Here we make sure two different inputs can't write the same stream
// to the hash. This delimiter is not in the base64.URLEncoding
// alphabet so there is no way to have spill over collisions. Without
// it 'CN:foo,ORG:bar' hashes to the same value as 'CN:foob,ORG:ar'
const delimiter = '|'
encode := base64.RawURLEncoding.EncodeToString
write := func(data []byte) {
hash.Write([]byte(encode(data)))
hash.Write([]byte{delimiter})
}
write(privateKeyData)
write([]byte(subject.CommonName))
for _, v := range subject.Organization {
write([]byte(v))
}
for _, v := range usages {
write([]byte(v))
}
return "node-csr-" + encode(hash.Sum(nil))
}
// ensureCompatible ensures that a CSR object is compatible with an original CSR
func ensureCompatible(new, orig *certificates.CertificateSigningRequest, privateKey interface{}) error {
newCsr, err := ParseCSR(new)
newCSR, err := parseCSR(new)
if err != nil {
return fmt.Errorf("unable to parse new csr: %v", err)
}
origCsr, err := ParseCSR(orig)
origCSR, err := parseCSR(orig)
if err != nil {
return fmt.Errorf("unable to parse original csr: %v", err)
}
if !reflect.DeepEqual(newCsr.Subject, origCsr.Subject) {
return fmt.Errorf("csr subjects differ: new: %#v, orig: %#v", newCsr.Subject, origCsr.Subject)
if !reflect.DeepEqual(newCSR.Subject, origCSR.Subject) {
return fmt.Errorf("csr subjects differ: new: %#v, orig: %#v", newCSR.Subject, origCSR.Subject)
}
signer, ok := privateKey.(crypto.Signer)
if !ok {
return fmt.Errorf("privateKey is not a signer")
}
newCsr.PublicKey = signer.Public()
if err := newCsr.CheckSignature(); err != nil {
newCSR.PublicKey = signer.Public()
if err := newCSR.CheckSignature(); err != nil {
return fmt.Errorf("error validating signature new CSR against old key: %v", err)
}
if len(new.Status.Certificate) > 0 {
@@ -247,17 +176,12 @@ func formatError(format string, err error) error {
return fmt.Errorf(format, err)
}
// ParseCSR extracts the CSR from the API object and decodes it.
func ParseCSR(obj *certificates.CertificateSigningRequest) (*x509.CertificateRequest, error) {
// parseCSR extracts the CSR from the API object and decodes it.
func parseCSR(obj *certificates.CertificateSigningRequest) (*x509.CertificateRequest, error) {
// extract PEM from request object
pemBytes := obj.Spec.Request
block, _ := pem.Decode(pemBytes)
block, _ := pem.Decode(obj.Spec.Request)
if block == nil || block.Type != "CERTIFICATE REQUEST" {
return nil, fmt.Errorf("PEM block type must be CERTIFICATE REQUEST")
}
csr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
return nil, err
}
return csr, nil
return x509.ParseCertificateRequest(block.Bytes)
}

View File

@@ -1,135 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csr
import (
"fmt"
"testing"
certificates "k8s.io/api/certificates/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
certutil "k8s.io/client-go/util/cert"
)
func TestRequestNodeCertificateNoKeyData(t *testing.T) {
certData, err := RequestNodeCertificate(&fakeClient{}, []byte{}, "fake-node-name")
if err == nil {
t.Errorf("Got no error, wanted error an error because there was an empty private key passed in.")
}
if certData != nil {
t.Errorf("Got cert data, wanted nothing as there should have been an error.")
}
}
func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) {
client := &fakeClient{
failureType: createError,
}
privateKeyData, err := certutil.MakeEllipticPrivateKeyPEM()
if err != nil {
t.Fatalf("Unable to generate a new private key: %v", err)
}
certData, err := RequestNodeCertificate(client, privateKeyData, "fake-node-name")
if err == nil {
t.Errorf("Got no error, wanted error an error because client.Create failed.")
}
if certData != nil {
t.Errorf("Got cert data, wanted nothing as there should have been an error.")
}
}
func TestRequestNodeCertificate(t *testing.T) {
privateKeyData, err := certutil.MakeEllipticPrivateKeyPEM()
if err != nil {
t.Fatalf("Unable to generate a new private key: %v", err)
}
certData, err := RequestNodeCertificate(&fakeClient{}, privateKeyData, "fake-node-name")
if err != nil {
t.Errorf("Got %v, wanted no error.", err)
}
if certData == nil {
t.Errorf("Got nothing, expected a CSR.")
}
}
type FailureType int
const (
noError FailureType = iota
createError
certificateSigningRequestDenied
)
type fakeClient struct {
certificatesclient.CertificateSigningRequestInterface
watch *watch.FakeWatcher
failureType FailureType
}
func (c *fakeClient) Create(*certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) {
if c.failureType == createError {
return nil, fmt.Errorf("fakeClient failed creating request")
}
csr := certificates.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
UID: "fake-uid",
Name: "fake-certificate-signing-request-name",
},
}
return &csr, nil
}
func (c *fakeClient) List(opts metav1.ListOptions) (*certificates.CertificateSigningRequestList, error) {
return &certificates.CertificateSigningRequestList{}, nil
}
func (c *fakeClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
c.watch = watch.NewFakeWithChanSize(1, false)
c.watch.Add(c.generateCSR())
c.watch.Stop()
return c.watch, nil
}
func (c *fakeClient) generateCSR() *certificates.CertificateSigningRequest {
var condition certificates.CertificateSigningRequestCondition
if c.failureType == certificateSigningRequestDenied {
condition = certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateDenied,
}
} else {
condition = certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
}
}
csr := certificates.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
UID: "fake-uid",
},
Status: certificates.CertificateSigningRequestStatus{
Conditions: []certificates.CertificateSigningRequestCondition{
condition,
},
Certificate: []byte{},
},
}
return &csr
}

View File

@@ -19,6 +19,8 @@ package workqueue
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
// This file provides abstractions for setting the provider (e.g., prometheus)
@@ -28,6 +30,7 @@ type queueMetrics interface {
add(item t)
get(item t)
done(item t)
updateUnfinishedWork()
}
// GaugeMetric represents a single numerical value that can arbitrarily go up
@@ -37,6 +40,12 @@ type GaugeMetric interface {
Dec()
}
// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SettableGaugeMetric interface {
Set(float64)
}
// CounterMetric represents a single numerical value that only ever
// goes up.
type CounterMetric interface {
@@ -52,9 +61,13 @@ type noopMetric struct{}
func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {}
// defaultQueueMetrics expects the caller to lock before setting any metrics.
type defaultQueueMetrics struct {
clock clock.Clock
// current depth of a workqueue
depth GaugeMetric
// total number of adds handled by a workqueue
@@ -65,6 +78,10 @@ type defaultQueueMetrics struct {
workDuration SummaryMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
// how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
}
func (m *defaultQueueMetrics) add(item t) {
@@ -75,7 +92,7 @@ func (m *defaultQueueMetrics) add(item t) {
m.adds.Inc()
m.depth.Inc()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = time.Now()
m.addTimes[item] = m.clock.Now()
}
}
@@ -85,9 +102,9 @@ func (m *defaultQueueMetrics) get(item t) {
}
m.depth.Dec()
m.processingStartTimes[item] = time.Now()
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(sinceInMicroseconds(startTime))
m.latency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item)
}
}
@@ -98,14 +115,39 @@ func (m *defaultQueueMetrics) done(item t) {
}
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(sinceInMicroseconds(startTime))
m.workDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item)
}
}
func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Note that a summary metric would be better for this, but prometheus
// doesn't seem to have non-hacky ways to reset the summary metrics.
var total float64
var oldest float64
for _, t := range m.processingStartTimes {
age := m.sinceInMicroseconds(t)
total += age
if age > oldest {
oldest = age
}
}
// Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000
m.unfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest) // in microseconds.
}
type noMetrics struct{}
func (noMetrics) add(item t) {}
func (noMetrics) get(item t) {}
func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {}
// Gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
type retryMetrics interface {
@@ -130,6 +172,8 @@ type MetricsProvider interface {
NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric
NewWorkDurationMetric(name string) SummaryMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric
}
@@ -151,29 +195,49 @@ func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var metricsFactory = struct {
metricsProvider MetricsProvider
setProviders sync.Once
}{
var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{},
}
func newQueueMetrics(name string) queueMetrics {
var ret *defaultQueueMetrics
if len(name) == 0 {
return ret
type queueMetricsFactory struct {
metricsProvider MetricsProvider
onlyOnce sync.Once
}
func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
f.onlyOnce.Do(func() {
f.metricsProvider = mp
})
}
func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
mp := f.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{}
}
return &defaultQueueMetrics{
depth: metricsFactory.metricsProvider.NewDepthMetric(name),
adds: metricsFactory.metricsProvider.NewAddsMetric(name),
latency: metricsFactory.metricsProvider.NewLatencyMetric(name),
workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
}
}
@@ -183,13 +247,12 @@ func newRetryMetrics(name string) retryMetrics {
return ret
}
return &defaultRetryMetrics{
retries: metricsFactory.metricsProvider.NewRetriesMetric(name),
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
}
}
// SetProvider sets the metrics provider of the metricsFactory.
// SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() {
metricsFactory.metricsProvider = metricsProvider
})
globalMetricsFactory.setProvider(metricsProvider)
}

293
vendor/k8s.io/client-go/util/workqueue/metrics_test.go generated vendored Normal file
View File

@@ -0,0 +1,293 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
type testMetrics struct {
added, gotten, finished int64
updateCalled chan<- struct{}
}
func (m *testMetrics) add(item t) { m.added++ }
func (m *testMetrics) get(item t) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetricShutdown(t *testing.T) {
ch := make(chan struct{})
m := &testMetrics{
updateCalled: ch,
}
c := clock.NewFakeClock(time.Now())
q := newQueue(c, m, time.Millisecond)
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
c.Step(time.Millisecond)
<-ch
q.ShutDown()
c.Step(time.Hour)
select {
default:
return
case <-ch:
t.Errorf("Unexpected update after shutdown was called.")
}
}
type testMetric struct {
inc int64
dec int64
set float64
observedValue float64
observedCount int
notifyCh chan<- struct{}
lock sync.Mutex
}
func (m *testMetric) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.inc++
m.notify()
}
func (m *testMetric) Dec() {
m.lock.Lock()
defer m.lock.Unlock()
m.dec++
m.notify()
}
func (m *testMetric) Set(f float64) {
m.lock.Lock()
defer m.lock.Unlock()
m.set = f
m.notify()
}
func (m *testMetric) Observe(f float64) {
m.lock.Lock()
defer m.lock.Unlock()
m.observedValue = f
m.observedCount++
m.notify()
}
func (m *testMetric) gaugeValue() float64 {
m.lock.Lock()
defer m.lock.Unlock()
if m.set != 0 {
return m.set
}
return float64(m.inc - m.dec)
}
func (m *testMetric) observationValue() float64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.observedValue
}
func (m *testMetric) observationCount() int {
m.lock.Lock()
defer m.lock.Unlock()
return m.observedCount
}
func (m *testMetric) notify() {
if m.notifyCh != nil {
m.notifyCh <- struct{}{}
}
}
type testMetricsProvider struct {
depth testMetric
adds testMetric
latency testMetric
duration testMetric
unfinished testMetric
longest testMetric
retries testMetric
}
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
return &m.depth
}
func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
return &m.adds
}
func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
return &m.latency
}
func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
return &m.duration
}
func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return &m.unfinished
}
func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.longest
}
func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries
}
func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now())
mf := queueMetricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
dqm := m.(*defaultQueueMetrics)
for _, i := range []int{1, 50, 100, 500, 1000, 10000, 100000, 1000000} {
n := c.Now()
c.Step(time.Duration(i) * time.Microsecond)
if e, a := float64(i), dqm.sinceInMicroseconds(n); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}
func TestMetrics(t *testing.T) {
mp := testMetricsProvider{}
t0 := time.Unix(0, 0)
c := clock.NewFakeClock(t0)
mf := queueMetricsFactory{metricsProvider: &mp}
m := mf.newQueueMetrics("test", c)
q := newQueue(c, m, time.Millisecond)
defer q.ShutDown()
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
q.Add("foo")
if e, a := 1.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond)
// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 50.0, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are
// de-duped.
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond)
// Finish it up
q.Done(i)
if e, a := 25.0, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue
i, _ = q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 25.0, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's
// been set.
ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch
c.Step(time.Millisecond)
<-ch
mp.unfinished.notifyCh = nil
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up
q.Done(i)
if e, a := 1000.0, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

View File

@@ -18,6 +18,9 @@ package workqueue
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
type Interface interface {
@@ -35,14 +38,29 @@ func New() *Type {
}
func NewNamed(name string) *Type {
return &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: newQueueMetrics(name),
}
rc := clock.RealClock{}
return newQueue(
rc,
globalMetricsFactory.newQueueMetrics(name, rc),
defaultUnfinishedWorkUpdatePeriod,
)
}
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
clock: c,
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
}
go t.updateUnfinishedWorkLoop()
return t
}
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
@@ -64,6 +82,9 @@ type Type struct {
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
@@ -170,3 +191,22 @@ func (q *Type) ShuttingDown() bool {
return q.shuttingDown
}
func (q *Type) updateUnfinishedWorkLoop() {
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C() {
if !func() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.shuttingDown {
q.metrics.updateUnfinishedWork()
return true
}
return false
}() {
return
}
}
}