Update go mod and vendor in client dir to 0.28.0

This commit is contained in:
Raunak Pradip Shah
2023-11-08 10:57:27 +05:30
parent dad8b28c35
commit 1bf2305d28
628 changed files with 61667 additions and 17445 deletions

View File

@@ -24,19 +24,36 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
// StaleGroupVersionError encasulates failed GroupVersion marked "stale"
// in the returned AggregatedDiscovery format.
type StaleGroupVersionError struct {
gv schema.GroupVersion
}
func (s StaleGroupVersionError) Error() string {
return fmt.Sprintf("stale GroupVersion discovery: %v", s.gv)
}
// SplitGroupsAndResources transforms "aggregated" discovery top-level structure into
// the previous "unaggregated" discovery groups and resources.
func SplitGroupsAndResources(aggregatedGroups apidiscovery.APIGroupDiscoveryList) (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList) {
func SplitGroupsAndResources(aggregatedGroups apidiscovery.APIGroupDiscoveryList) (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error) {
// Aggregated group list will contain the entirety of discovery, including
// groups, versions, and resources.
// groups, versions, and resources. GroupVersions marked "stale" are failed.
groups := []*metav1.APIGroup{}
failedGVs := map[schema.GroupVersion]error{}
resourcesByGV := map[schema.GroupVersion]*metav1.APIResourceList{}
for _, aggGroup := range aggregatedGroups.Items {
group, resources := convertAPIGroup(aggGroup)
group, resources, failed := convertAPIGroup(aggGroup)
groups = append(groups, group)
for gv, resourceList := range resources {
resourcesByGV[gv] = resourceList
}
for gv, err := range failed {
failedGVs[gv] = err
}
}
// Transform slice of groups to group list before returning.
groupList := &metav1.APIGroupList{}
@@ -44,65 +61,96 @@ func SplitGroupsAndResources(aggregatedGroups apidiscovery.APIGroupDiscoveryList
for _, group := range groups {
groupList.Groups = append(groupList.Groups, *group)
}
return groupList, resourcesByGV
return groupList, resourcesByGV, failedGVs
}
// convertAPIGroup tranforms an "aggregated" APIGroupDiscovery to an "legacy" APIGroup,
// also returning the map of APIResourceList for resources within GroupVersions.
func convertAPIGroup(g apidiscovery.APIGroupDiscovery) (*metav1.APIGroup, map[schema.GroupVersion]*metav1.APIResourceList) {
func convertAPIGroup(g apidiscovery.APIGroupDiscovery) (
*metav1.APIGroup,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error) {
// Iterate through versions to convert to group and resources.
group := &metav1.APIGroup{}
gvResources := map[schema.GroupVersion]*metav1.APIResourceList{}
failedGVs := map[schema.GroupVersion]error{}
group.Name = g.ObjectMeta.Name
for i, v := range g.Versions {
version := metav1.GroupVersionForDiscovery{}
for _, v := range g.Versions {
gv := schema.GroupVersion{Group: g.Name, Version: v.Version}
if v.Freshness == apidiscovery.DiscoveryFreshnessStale {
failedGVs[gv] = StaleGroupVersionError{gv: gv}
continue
}
version := metav1.GroupVersionForDiscovery{}
version.GroupVersion = gv.String()
version.Version = v.Version
group.Versions = append(group.Versions, version)
if i == 0 {
// PreferredVersion is first non-stale Version
if group.PreferredVersion == (metav1.GroupVersionForDiscovery{}) {
group.PreferredVersion = version
}
resourceList := &metav1.APIResourceList{}
resourceList.GroupVersion = gv.String()
for _, r := range v.Resources {
resource := convertAPIResource(r)
resourceList.APIResources = append(resourceList.APIResources, resource)
resource, err := convertAPIResource(r)
if err == nil {
resourceList.APIResources = append(resourceList.APIResources, resource)
}
// Subresources field in new format get transformed into full APIResources.
// It is possible a partial result with an error was returned to be used
// as the parent resource for the subresource.
for _, subresource := range r.Subresources {
sr := convertAPISubresource(resource, subresource)
resourceList.APIResources = append(resourceList.APIResources, sr)
sr, err := convertAPISubresource(resource, subresource)
if err == nil {
resourceList.APIResources = append(resourceList.APIResources, sr)
}
}
}
gvResources[gv] = resourceList
}
return group, gvResources
return group, gvResources, failedGVs
}
// convertAPIResource tranforms a APIResourceDiscovery to an APIResource.
func convertAPIResource(in apidiscovery.APIResourceDiscovery) metav1.APIResource {
return metav1.APIResource{
var emptyKind = metav1.GroupVersionKind{}
// convertAPIResource tranforms a APIResourceDiscovery to an APIResource. We are
// resilient to missing GVK, since this resource might be the parent resource
// for a subresource. If the parent is missing a GVK, it is not returned in
// discovery, and the subresource MUST have the GVK.
func convertAPIResource(in apidiscovery.APIResourceDiscovery) (metav1.APIResource, error) {
result := metav1.APIResource{
Name: in.Resource,
SingularName: in.SingularResource,
Namespaced: in.Scope == apidiscovery.ScopeNamespace,
Group: in.ResponseKind.Group,
Version: in.ResponseKind.Version,
Kind: in.ResponseKind.Kind,
Verbs: in.Verbs,
ShortNames: in.ShortNames,
Categories: in.Categories,
}
var err error
if in.ResponseKind != nil && (*in.ResponseKind) != emptyKind {
result.Group = in.ResponseKind.Group
result.Version = in.ResponseKind.Version
result.Kind = in.ResponseKind.Kind
} else {
err = fmt.Errorf("discovery resource %s missing GVK", in.Resource)
}
// Can return partial result with error, which can be the parent for a
// subresource. Do not add this result to the returned discovery resources.
return result, err
}
// convertAPISubresource tranforms a APISubresourceDiscovery to an APIResource.
func convertAPISubresource(parent metav1.APIResource, in apidiscovery.APISubresourceDiscovery) metav1.APIResource {
return metav1.APIResource{
Name: fmt.Sprintf("%s/%s", parent.Name, in.Subresource),
SingularName: parent.SingularName,
Namespaced: parent.Namespaced,
Group: in.ResponseKind.Group,
Version: in.ResponseKind.Version,
Kind: in.ResponseKind.Kind,
Verbs: in.Verbs,
func convertAPISubresource(parent metav1.APIResource, in apidiscovery.APISubresourceDiscovery) (metav1.APIResource, error) {
result := metav1.APIResource{}
if in.ResponseKind == nil || (*in.ResponseKind) == emptyKind {
return result, fmt.Errorf("subresource %s/%s missing GVK", parent.Name, in.Subresource)
}
result.Name = fmt.Sprintf("%s/%s", parent.Name, in.Subresource)
result.SingularName = parent.SingularName
result.Namespaced = parent.Namespaced
result.Group = in.ResponseKind.Group
result.Version = in.ResponseKind.Version
result.Kind = in.ResponseKind.Kind
result.Verbs = in.Verbs
return result, nil
}

View File

@@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"mime"
"net/http"
"net/url"
"sort"
@@ -29,7 +30,7 @@ import (
//nolint:staticcheck // SA1019 Keep using module since it's still being maintained and the api of google.golang.org/protobuf/proto differs
"github.com/golang/protobuf/proto"
openapi_v2 "github.com/google/gnostic/openapiv2"
openapi_v2 "github.com/google/gnostic-models/openapiv2"
apidiscovery "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -58,8 +59,9 @@ const (
defaultBurst = 300
AcceptV1 = runtime.ContentTypeJSON
// Aggregated discovery content-type (currently v2beta1). NOTE: Currently, we are assuming the order
// for "g", "v", and "as" from the server. We can only compare this string if we can make that assumption.
// Aggregated discovery content-type (v2beta1). NOTE: content-type parameters
// MUST be ordered (g, v, as) for server in "Accept" header (BUT we are resilient
// to ordering when comparing returned values in "Content-Type" header).
AcceptV2Beta1 = runtime.ContentTypeJSON + ";" + "g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList"
// Prioritize aggregated discovery by placing first in the order of discovery accept types.
acceptDiscoveryFormats = AcceptV2Beta1 + "," + AcceptV1
@@ -86,7 +88,7 @@ type DiscoveryInterface interface {
type AggregatedDiscoveryInterface interface {
DiscoveryInterface
GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, error)
GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error)
}
// CachedDiscoveryInterface is a DiscoveryInterface with cache invalidation and freshness.
@@ -186,18 +188,23 @@ func apiVersionsToAPIGroup(apiVersions *metav1.APIVersions) (apiGroup metav1.API
// and resources from /api and /apis (either aggregated or not). Legacy groups
// must be ordered first. The server will either return both endpoints (/api, /apis)
// as aggregated discovery format or legacy format. For safety, resources will only
// be returned if both endpoints returned resources.
func (d *DiscoveryClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, error) {
// be returned if both endpoints returned resources. Returned "failedGVs" can be
// empty, but will only be nil in the case an error is returned.
func (d *DiscoveryClient) GroupsAndMaybeResources() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
// Legacy group ordered first (there is only one -- core/v1 group). Returned groups must
// be non-nil, but it could be empty. Returned resources, apiResources map could be nil.
groups, resources, err := d.downloadLegacy()
groups, resources, failedGVs, err := d.downloadLegacy()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
// Discovery groups and (possibly) resources downloaded from /apis.
apiGroups, apiResources, aerr := d.downloadAPIs()
if err != nil {
return nil, nil, aerr
apiGroups, apiResources, failedApisGVs, aerr := d.downloadAPIs()
if aerr != nil {
return nil, nil, nil, aerr
}
// Merge apis groups into the legacy groups.
for _, group := range apiGroups.Groups {
@@ -211,14 +218,23 @@ func (d *DiscoveryClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[s
} else if resources != nil {
resources = nil
}
return groups, resources, err
// Merge failed GroupVersions from /api and /apis
for gv, err := range failedApisGVs {
failedGVs[gv] = err
}
return groups, resources, failedGVs, err
}
// downloadLegacy returns the discovery groups and possibly resources
// for the legacy v1 GVR at /api, or an error if one occurred. It is
// possible for the resource map to be nil if the server returned
// the unaggregated discovery.
func (d *DiscoveryClient) downloadLegacy() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, error) {
// the unaggregated discovery. Returned "failedGVs" can be empty, but
// will only be nil in the case of a returned error.
func (d *DiscoveryClient) downloadLegacy() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
accept := acceptDiscoveryFormats
if d.UseLegacyDiscovery {
accept = AcceptV1
@@ -230,48 +246,55 @@ func (d *DiscoveryClient) downloadLegacy() (*metav1.APIGroupList, map[schema.Gro
Do(context.TODO()).
ContentType(&responseContentType).
Raw()
// Special error handling for 403 or 404 to be compatible with older v1.0 servers.
// Return empty group list to be merged with /apis.
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, nil, err
}
if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
return &metav1.APIGroupList{}, nil, nil
apiGroupList := &metav1.APIGroupList{}
failedGVs := map[schema.GroupVersion]error{}
if err != nil {
// Tolerate 404, since aggregated api servers can return it.
if errors.IsNotFound(err) {
// Return empty structures and no error.
emptyGVMap := map[schema.GroupVersion]*metav1.APIResourceList{}
return apiGroupList, emptyGVMap, failedGVs, nil
} else {
return nil, nil, nil, err
}
}
apiGroupList := &metav1.APIGroupList{}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch responseContentType {
case AcceptV1:
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
// Default is unaggregated discovery v1.
var v metav1.APIVersions
err = json.Unmarshal(body, &v)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
apiGroup := metav1.APIGroup{}
if len(v.Versions) != 0 {
apiGroup = apiVersionsToAPIGroup(&v)
}
apiGroupList.Groups = []metav1.APIGroup{apiGroup}
case AcceptV2Beta1:
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, err
}
apiGroupList, resourcesByGV = SplitGroupsAndResources(aggregatedDiscovery)
default:
return nil, nil, fmt.Errorf("Unknown discovery response content-type: %s", responseContentType)
}
return apiGroupList, resourcesByGV, nil
return apiGroupList, resourcesByGV, failedGVs, nil
}
// downloadAPIs returns the discovery groups and (if aggregated format) the
// discovery resources. The returned groups will always exist, but the
// resources map may be nil.
func (d *DiscoveryClient) downloadAPIs() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, error) {
// resources map may be nil. Returned "failedGVs" can be empty, but will
// only be nil in the case of a returned error.
func (d *DiscoveryClient) downloadAPIs() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
accept := acceptDiscoveryFormats
if d.UseLegacyDiscovery {
accept = AcceptV1
@@ -283,42 +306,59 @@ func (d *DiscoveryClient) downloadAPIs() (*metav1.APIGroupList, map[schema.Group
Do(context.TODO()).
ContentType(&responseContentType).
Raw()
// Special error handling for 403 or 404 to be compatible with older v1.0 servers.
// Return empty group list to be merged with /api.
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, nil, err
}
if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
return &metav1.APIGroupList{}, nil, nil
if err != nil {
return nil, nil, nil, err
}
apiGroupList := &metav1.APIGroupList{}
failedGVs := map[schema.GroupVersion]error{}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch responseContentType {
case AcceptV1:
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, err
}
case AcceptV2Beta1:
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
apiGroupList, resourcesByGV = SplitGroupsAndResources(aggregatedDiscovery)
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
return nil, nil, fmt.Errorf("Unknown discovery response content-type: %s", responseContentType)
// Default is unaggregated discovery v1.
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, nil, err
}
}
return apiGroupList, resourcesByGV, nil
return apiGroupList, resourcesByGV, failedGVs, nil
}
// isV2Beta1ContentType checks of the content-type string is both
// "application/json" and contains the v2beta1 content-type params.
// NOTE: This function is resilient to the ordering of the
// content-type parameters, as well as parameters added by
// intermediaries such as proxies or gateways. Examples:
//
// "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" = true
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io" = true
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8" = true
// "application/json" = false
// "application/json; charset=UTF-8" = false
func isV2Beta1ContentType(contentType string) bool {
base, params, err := mime.ParseMediaType(contentType)
if err != nil {
return false
}
return runtime.ContentTypeJSON == base &&
params["g"] == "apidiscovery.k8s.io" &&
params["v"] == "v2beta1" &&
params["as"] == "APIGroupDiscoveryList"
}
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *DiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
groups, _, err := d.GroupsAndMaybeResources()
groups, _, _, err := d.GroupsAndMaybeResources()
if err != nil {
return nil, err
}
@@ -341,8 +381,10 @@ func (d *DiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (r
}
err = d.restClient.Get().AbsPath(url.String()).Do(context.TODO()).Into(resources)
if err != nil {
// ignore 403 or 404 error to be compatible with an v1.0 server.
if groupVersion == "v1" && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
// Tolerate core/v1 not found response by returning empty resource list;
// this probably should not happen. But we should verify all callers are
// not depending on this toleration before removal.
if groupVersion == "v1" && errors.IsNotFound(err) {
return resources, nil
}
return nil, err
@@ -383,13 +425,14 @@ func IsGroupDiscoveryFailedError(err error) bool {
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
var err error
// If the passed discovery object implements the wider AggregatedDiscoveryInterface,
// then attempt to retrieve aggregated discovery with both groups and the resources.
if ad, ok := d.(AggregatedDiscoveryInterface); ok {
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
sgs, resourcesByGV, err = ad.GroupsAndMaybeResources()
sgs, resourcesByGV, failedGVs, err = ad.GroupsAndMaybeResources()
for _, resourceList := range resourcesByGV {
resources = append(resources, resourceList)
}
@@ -404,8 +447,15 @@ func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*meta
for i := range sgs.Groups {
resultGroups = append(resultGroups, &sgs.Groups[i])
}
// resources is non-nil if aggregated discovery succeeded.
if resources != nil {
return resultGroups, resources, nil
// Any stale Group/Versions returned by aggregated discovery
// must be surfaced to the caller as failed Group/Versions.
var ferr error
if len(failedGVs) > 0 {
ferr = &ErrGroupDiscoveryFailed{Groups: failedGVs}
}
return resultGroups, resources, ferr
}
groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs)
@@ -436,16 +486,18 @@ func ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList,
var err error
// If the passed discovery object implements the wider AggregatedDiscoveryInterface,
// then it is attempt to retrieve both the groups and the resources.
// then it is attempt to retrieve both the groups and the resources. "failedGroups"
// are Group/Versions returned as stale in AggregatedDiscovery format.
ad, ok := d.(AggregatedDiscoveryInterface)
if ok {
serverGroupList, groupVersionResources, err = ad.GroupsAndMaybeResources()
serverGroupList, groupVersionResources, failedGroups, err = ad.GroupsAndMaybeResources()
} else {
serverGroupList, err = d.ServerGroups()
}
if err != nil {
return nil, err
}
// Non-aggregated discovery must fetch resources from Groups.
if groupVersionResources == nil {
groupVersionResources, failedGroups = fetchGroupVersionResources(d, serverGroupList)
}

View File

@@ -20,7 +20,7 @@ import (
"fmt"
"net/http"
openapi_v2 "github.com/google/gnostic/openapiv2"
openapi_v2 "github.com/google/gnostic-models/openapiv2"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -141,7 +141,10 @@ func (c *FakeDiscovery) ServerVersion() (*version.Info, error) {
action := testing.ActionImpl{}
action.Verb = "get"
action.Resource = schema.GroupVersionResource{Resource: "version"}
c.Invokes(action, nil)
_, err := c.Invokes(action, nil)
if err != nil {
return nil, err
}
if c.FakedServerVersion != nil {
return c.FakedServerVersion, nil

View File

@@ -38,6 +38,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
certificatesv1 "k8s.io/api/certificates/v1"
certificatesv1alpha1 "k8s.io/api/certificates/v1alpha1"
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
coordinationv1 "k8s.io/api/coordination/v1"
coordinationv1beta1 "k8s.io/api/coordination/v1beta1"
@@ -62,7 +63,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
rbacv1alpha1 "k8s.io/api/rbac/v1alpha1"
rbacv1beta1 "k8s.io/api/rbac/v1beta1"
resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
schedulingv1 "k8s.io/api/scheduling/v1"
schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1"
schedulingv1beta1 "k8s.io/api/scheduling/v1beta1"
@@ -100,6 +101,7 @@ var localSchemeBuilder = runtime.SchemeBuilder{
batchv1beta1.AddToScheme,
certificatesv1.AddToScheme,
certificatesv1beta1.AddToScheme,
certificatesv1alpha1.AddToScheme,
coordinationv1beta1.AddToScheme,
coordinationv1.AddToScheme,
corev1.AddToScheme,
@@ -123,7 +125,7 @@ var localSchemeBuilder = runtime.SchemeBuilder{
rbacv1.AddToScheme,
rbacv1beta1.AddToScheme,
rbacv1alpha1.AddToScheme,
resourcev1alpha1.AddToScheme,
resourcev1alpha2.AddToScheme,
schedulingv1alpha1.AddToScheme,
schedulingv1beta1.AddToScheme,
schedulingv1.AddToScheme,

4
client/vendor/k8s.io/client-go/openapi/OWNERS generated vendored Normal file
View File

@@ -0,0 +1,4 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- apelisse

View File

@@ -19,6 +19,7 @@ package openapi
import (
"context"
"encoding/json"
"strings"
"k8s.io/client-go/rest"
"k8s.io/kube-openapi/pkg/handler3"
@@ -58,7 +59,11 @@ func (c *client) Paths() (map[string]GroupVersion, error) {
// Create GroupVersions for each element of the result
result := map[string]GroupVersion{}
for k, v := range discoMap.Paths {
result[k] = newGroupVersion(c, v)
// If the server returned a URL rooted at /openapi/v3, preserve any additional client-side prefix.
// If the server returned a URL not rooted at /openapi/v3, treat it as an actual server-relative URL.
// See https://github.com/kubernetes/kubernetes/issues/117463 for details
useClientPrefix := strings.HasPrefix(v.ServerRelativeURL, "/openapi/v3")
result[k] = newGroupVersion(c, v, useClientPrefix)
}
return result, nil
}

View File

@@ -18,6 +18,7 @@ package openapi
import (
"context"
"net/url"
"k8s.io/kube-openapi/pkg/handler3"
)
@@ -29,18 +30,41 @@ type GroupVersion interface {
}
type groupversion struct {
client *client
item handler3.OpenAPIV3DiscoveryGroupVersion
client *client
item handler3.OpenAPIV3DiscoveryGroupVersion
useClientPrefix bool
}
func newGroupVersion(client *client, item handler3.OpenAPIV3DiscoveryGroupVersion) *groupversion {
return &groupversion{client: client, item: item}
func newGroupVersion(client *client, item handler3.OpenAPIV3DiscoveryGroupVersion, useClientPrefix bool) *groupversion {
return &groupversion{client: client, item: item, useClientPrefix: useClientPrefix}
}
func (g *groupversion) Schema(contentType string) ([]byte, error) {
return g.client.restClient.Get().
RequestURI(g.item.ServerRelativeURL).
SetHeader("Accept", contentType).
Do(context.TODO()).
Raw()
if !g.useClientPrefix {
return g.client.restClient.Get().
RequestURI(g.item.ServerRelativeURL).
SetHeader("Accept", contentType).
Do(context.TODO()).
Raw()
}
locator, err := url.Parse(g.item.ServerRelativeURL)
if err != nil {
return nil, err
}
path := g.client.restClient.Get().
AbsPath(locator.Path).
SetHeader("Accept", contentType)
// Other than root endpoints(openapiv3/apis), resources have hash query parameter to support etags.
// However, absPath does not support handling query parameters internally,
// so that hash query parameter is added manually
for k, value := range locator.Query() {
for _, v := range value {
path.Param(k, v)
}
}
return path.Do(context.TODO()).Raw()
}

View File

@@ -0,0 +1,48 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/validation/spec"
)
func NewTypeConverter(client Client, preserveUnknownFields bool) (managedfields.TypeConverter, error) {
spec := map[string]*spec.Schema{}
paths, err := client.Paths()
if err != nil {
return nil, fmt.Errorf("failed to list paths: %w", err)
}
for _, gv := range paths {
s, err := gv.Schema("application/json")
if err != nil {
return nil, fmt.Errorf("failed to download schema: %w", err)
}
var openapi spec3.OpenAPI
if err := json.Unmarshal(s, &openapi); err != nil {
return nil, fmt.Errorf("failed to parse schema: %w", err)
}
for k, v := range openapi.Components.Schemas {
spec[k] = v
}
}
return managedfields.NewTypeConverter(spec, preserveUnknownFields)
}

View File

@@ -43,7 +43,8 @@ var (
gitMinor string = "" // minor version, numeric possibly followed by "+"
// semantic version, derived by build scripts (see
// https://git.k8s.io/community/contributors/design-proposals/release/versioning.md
// https://github.com/kubernetes/sig-release/blob/master/release-engineering/versioning.md#kubernetes-release-versioning
// https://kubernetes.io/releases/version-skew-policy/
// for a detailed discussion of this field)
//
// TODO: This field is still called "gitVersion" for legacy

View File

@@ -32,12 +32,12 @@ import (
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"golang.org/x/term"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/dump"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/pkg/apis/clientauthentication"
"k8s.io/client-go/pkg/apis/clientauthentication/install"
@@ -81,8 +81,6 @@ func newCache() *cache {
return &cache{m: make(map[string]*Authenticator)}
}
var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string {
key := struct {
conf *api.ExecConfig
@@ -91,7 +89,7 @@ func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) strin
conf: conf,
cluster: cluster,
}
return spewConfig.Sprint(key)
return dump.Pretty(key)
}
type cache struct {

View File

@@ -52,8 +52,7 @@ type Interface interface {
// ClientContentConfig controls how RESTClient communicates with the server.
//
// TODO: ContentConfig will be updated to accept a Negotiator instead of a
//
// NegotiatedSerializer and NegotiatedSerializer will be removed.
// NegotiatedSerializer and NegotiatedSerializer will be removed.
type ClientContentConfig struct {
// AcceptContentTypes specifies the types the client will accept and is optional.
// If not set, ContentType will be used to define the Accept header

View File

@@ -316,7 +316,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
// Validate config.Host before constructing the transport/client so we can fail fast.
// ServerURL will be obtained later in RESTClientForConfigAndClient()
_, _, err := defaultServerUrlFor(config)
_, _, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -343,7 +343,7 @@ func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RES
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -390,7 +390,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
// Validate config.Host before constructing the transport/client so we can fail fast.
// ServerURL will be obtained later in UnversionedRESTClientForConfigAndClient()
_, _, err := defaultServerUrlFor(config)
_, _, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -410,7 +410,7 @@ func UnversionedRESTClientForConfigAndClient(config *Config, httpClient *http.Cl
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -548,7 +548,7 @@ func InClusterConfig() (*Config, error) {
// Note: the Insecure flag is ignored when testing for this value, so MITM attacks are
// still possible.
func IsConfigTransportTLS(config Config) bool {
baseURL, _, err := defaultServerUrlFor(&config)
baseURL, _, err := DefaultServerUrlFor(&config)
if err != nil {
return false
}

View File

@@ -24,6 +24,7 @@ import (
"io"
"mime"
"net/http"
"net/http/httptrace"
"net/url"
"os"
"path"
@@ -34,6 +35,7 @@ import (
"time"
"golang.org/x/net/http2"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -116,8 +118,11 @@ type Request struct {
subresource string
// output
err error
body io.Reader
err error
// only one of body / bodyBytes may be set. requests using body are not retriable.
body io.Reader
bodyBytes []byte
retryFn requestRetryFunc
}
@@ -443,12 +448,15 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
glogBody("Request Body", data)
r.body = bytes.NewReader(data)
r.body = nil
r.bodyBytes = data
case []byte:
glogBody("Request Body", t)
r.body = bytes.NewReader(t)
r.body = nil
r.bodyBytes = t
case io.Reader:
r.body = t
r.bodyBytes = nil
case runtime.Object:
// callers may pass typed interface pointers, therefore we must check nil with reflection
if reflect.ValueOf(t).IsNil() {
@@ -465,7 +473,8 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
glogBody("Request Body", data)
r.body = bytes.NewReader(data)
r.body = nil
r.bodyBytes = data
r.SetHeader("Content-Type", r.c.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
@@ -473,7 +482,13 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
// URL returns the current working URL.
// Error returns any error encountered constructing the request, if any.
func (r *Request) Error() error {
return r.err
}
// URL returns the current working URL. Check the result of Error() to ensure
// that the returned URL is valid.
func (r *Request) URL() *url.URL {
p := r.pathPrefix
if r.namespaceSet && len(r.namespace) > 0 {
@@ -718,7 +733,6 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
@@ -778,22 +792,36 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
), nil
}
// updateURLMetrics is a convenience function for pushing metrics.
// It also handles corner cases for incomplete/invalid request data.
func updateURLMetrics(ctx context.Context, req *Request, resp *http.Response, err error) {
url := "none"
// updateRequestResultMetric increments the RequestResult metric counter,
// it should be called with the (response, err) tuple from the final
// reply from the server.
func updateRequestResultMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestResult.Increment(ctx, code, req.verb, host)
}
// updateRequestRetryMetric increments the RequestRetry metric counter,
// it should be called with the (response, err) tuple for each retry
// except for the final attempt.
func updateRequestRetryMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestRetry.IncrementRetry(ctx, code, req.verb, host)
}
func sanitize(req *Request, resp *http.Response, err error) (string, string) {
host := "none"
if req.c.base != nil {
url = req.c.base.Host
host = req.c.base.Host
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
// system so we just report them as `<error>`.
if err != nil {
metrics.RequestResult.Increment(ctx, "<error>", req.verb, url)
} else {
// Metrics for failure codes
metrics.RequestResult.Increment(ctx, strconv.Itoa(resp.StatusCode), req.verb, url)
code := "<error>"
if resp != nil {
code = strconv.Itoa(resp.StatusCode)
}
return code, host
}
// Stream formats and executes the request, and offers streaming of the response.
@@ -825,11 +853,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
if r.body != nil {
req.Body = io.NopCloser(r.body)
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
@@ -889,16 +913,51 @@ func (r *Request) requestPreflightCheck() error {
}
func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
var body io.Reader
switch {
case r.body != nil && r.bodyBytes != nil:
return nil, fmt.Errorf("cannot set both body and bodyBytes")
case r.body != nil:
body = r.body
case r.bodyBytes != nil:
// Create a new reader specifically for this request.
// Giving each request a dedicated reader allows retries to avoid races resetting the request body.
body = bytes.NewReader(r.bodyBytes)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, newDNSMetricsTrace(ctx)), r.verb, url, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header = r.headers
return req, nil
}
// newDNSMetricsTrace returns an HTTP trace that tracks time spent on DNS lookups per host.
// This metric is available in client as "rest_client_dns_resolution_duration_seconds".
func newDNSMetricsTrace(ctx context.Context) *httptrace.ClientTrace {
type dnsMetric struct {
start time.Time
host string
sync.Mutex
}
dns := &dnsMetric{}
return &httptrace.ClientTrace{
DNSStart: func(info httptrace.DNSStartInfo) {
dns.Lock()
defer dns.Unlock()
dns.start = time.Now()
dns.host = info.Host
},
DNSDone: func(info httptrace.DNSDoneInfo) {
dns.Lock()
defer dns.Unlock()
metrics.ResolverLatency.Observe(ctx, dns.host, time.Since(dns.start))
},
}
}
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
@@ -962,7 +1021,6 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {

View File

@@ -77,9 +77,9 @@ func DefaultVersionedAPIPath(apiPath string, groupVersion schema.GroupVersion) s
return versionedAPIPath
}
// defaultServerUrlFor is shared between IsConfigTransportTLS and RESTClientFor. It
// DefaultServerUrlFor is shared between IsConfigTransportTLS and RESTClientFor. It
// requires Host and Version to be set prior to being called.
func defaultServerUrlFor(config *Config) (*url.URL, string, error) {
func DefaultServerUrlFor(config *Config) (*url.URL, string, error) {
// TODO: move the default to secure when the apiserver supports TLS by default
// config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
hasCA := len(config.CAFile) != 0 || len(config.CAData) != 0

View File

@@ -153,6 +153,11 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *
return false
}
if restReq.body != nil {
// we have an opaque reader, we can't safely reset it
return false
}
r.attempts++
r.retryAfter = &RetryAfter{Attempt: r.attempts}
if r.attempts > r.maxRetries {
@@ -209,18 +214,6 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
return nil
}
// At this point we've made atleast one attempt, post which the response
// body should have been fully read and closed in order for it to be safe
// to reset the request body before we reconnect, in order for us to reuse
// the same TCP connection.
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err)
r.trackPreviousError(err)
return err
}
}
// if we are here, we have made attempt(s) at least once before.
if request.backoff != nil {
delay := request.backoff.CalculateBackoff(url)
@@ -249,8 +242,20 @@ func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Resp
// parameters calculated from the (response, err) tuple from
// attempt N-1, so r.retryAfter is outdated and should not be
// referred to here.
isRetry := r.retryAfter != nil
r.retryAfter = nil
// the client finishes a single request after N attempts (1..N)
// - all attempts (1..N) are counted to the rest_client_requests_total
// metric (current behavior).
// - every attempt after the first (2..N) are counted to the
// rest_client_request_retries_total metric.
updateRequestResultMetric(ctx, request, resp, err)
if isRetry {
// this is attempt 2 or later
updateRequestRetryMetric(ctx, request, resp, err)
}
if request.c.base != nil {
if err != nil {
request.backoff.UpdateBackoff(request.URL(), err, 0)
@@ -353,8 +358,12 @@ func retryAfterResponse() *http.Response {
}
func retryAfterResponseWithDelay(delay string) *http.Response {
return retryAfterResponseWithCodeAndDelay(http.StatusInternalServerError, delay)
}
func retryAfterResponseWithCodeAndDelay(code int, delay string) *http.Response {
return &http.Response{
StatusCode: http.StatusInternalServerError,
StatusCode: code,
Header: http.Header{"Retry-After": []string{delay}},
}
}

View File

@@ -2,7 +2,6 @@
approvers:
- thockin
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
@@ -11,7 +10,6 @@ approvers:
- ncdc
reviewers:
- thockin
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
@@ -26,3 +24,5 @@ reviewers:
- dims
- ingvagabund
- ncdc
emeritus_approvers:
- lavalamp

View File

@@ -50,11 +50,12 @@ type Config struct {
Process ProcessFunc
// ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
// expected to handle.
ObjectType runtime.Object
// ObjectDescription is the description to use when logging type-specific information about this controller.
ObjectDescription string
// FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration
@@ -84,7 +85,7 @@ type Config struct {
type ShouldResyncFunc func() bool
// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error
type ProcessFunc func(obj interface{}, isInInitialList bool) error
// `*controller` implements Controller
type controller struct {
@@ -131,15 +132,18 @@ func (c *controller) Run(stopCh <-chan struct{}) {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
Clock: c.clock,
},
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
@@ -211,7 +215,7 @@ func (c *controller) processLoop() {
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnAdd(obj interface{}, isInInitialList bool)
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
@@ -220,6 +224,9 @@ type ResourceEventHandler interface {
// as few of the notification functions as you want while still implementing
// ResourceEventHandler. This adapter does not remove the prohibition against
// modifying the objects.
//
// See ResourceEventHandlerDetailedFuncs if your use needs to propagate
// HasSynced.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
@@ -227,7 +234,7 @@ type ResourceEventHandlerFuncs struct {
}
// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
@@ -247,6 +254,36 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
}
}
// ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs
// except its AddFunc accepts the isInInitialList parameter, for propagating
// HasSynced.
type ResourceEventHandlerDetailedFuncs struct {
AddFunc func(obj interface{}, isInInitialList bool)
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool) {
if r.AddFunc != nil {
r.AddFunc(obj, isInInitialList)
}
}
// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}
// FilteringResourceEventHandler applies the provided filter to all events coming
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
@@ -258,11 +295,11 @@ type FilteringResourceEventHandler struct {
}
// OnAdd calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnAdd(obj)
r.Handler.OnAdd(obj, isInInitialList)
}
// OnUpdate ensures the proper handler is called depending on whether the filter matches
@@ -273,7 +310,7 @@ func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
case newer && older:
r.Handler.OnUpdate(oldObj, newObj)
case newer && !older:
r.Handler.OnAdd(newObj)
r.Handler.OnAdd(newObj, false)
case !newer && older:
r.Handler.OnDelete(oldObj)
default:
@@ -353,17 +390,6 @@ func NewIndexerInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}
// TransformFunc allows for transforming an object before it will be processed
// and put into the controller cache and before the corresponding handlers will
// be called on it.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
// given controller doesn't care for them
type TransformFunc func(interface{}) (interface{}, error)
// NewTransformingInformer returns a Store and a controller for populating
// the store while also providing event notifications. You should only used
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
@@ -411,19 +437,12 @@ func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
@@ -436,7 +455,7 @@ func processDeltas(
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
@@ -475,6 +494,7 @@ func newInformer(
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transformer,
})
cfg := &Config{
@@ -484,9 +504,9 @@ func newInformer(
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
return processDeltas(h, clientState, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},

View File

@@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
// When true, `Replaced` events will be sent for items passed to a Replace() call.
// When false, `Sync` events will be sent instead.
EmitDeltaTypeReplaced bool
// If set, will be called for objects before enqueueing them. Please
// see the comment on TransformFunc for details.
Transformer TransformFunc
}
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@@ -129,8 +133,32 @@ type DeltaFIFO struct {
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
// Called with every object if non-nil.
transformer TransformFunc
}
// TransformFunc allows for transforming an object before it will be processed.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
//
// New in v1.27: In such cases, the contained object will already have gone
// through the transform object separately (when it was added / updated prior
// to the delete), so the TransformFunc can likely safely ignore such objects
// (i.e., just return the input object).
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
//
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
// sees the object before any other actor, and it is now safe to mutate the
// object in place instead of making a copy.
//
// Note that TransformFunc is called while inserting objects into the
// notification queue and is therefore extremely performance sensitive; please
// do not do anything that will take a long time.
type TransformFunc func(interface{}) (interface{}, error)
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string
@@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
}
f.cond.L = &f.lock
return f
@@ -271,6 +300,10 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
@@ -411,6 +444,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
@@ -526,6 +574,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
@@ -551,7 +600,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item)
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
@@ -566,12 +615,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K. If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
// object of K. The pre-existing keys are those in the union set of the keys in
// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
// the one present in the last delta in `f.items`. If there is no delta for K
// in `f.items`, it is the object in `f.knownObjects`
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
f.lock.Lock()
defer f.lock.Unlock()
@@ -595,56 +643,54 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
}
}
if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items {
// Do deletion detection against objects in the queue
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
// if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
deletedObj = d.Obj
}
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if f.knownObjects != nil {
// Detect deletions for objects not present in the queue, but present in KnownObjects
knownKeys := f.knownObjects.ListKeys()
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
if len(f.items[k]) > 0 {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = keys.Len() + queuedDeletions
}
return nil
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {

View File

@@ -25,7 +25,7 @@ import (
// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error
type PopProcessFunc func(obj interface{}, isInInitialList bool) error
// ErrRequeue may be returned by a PopProcessFunc to safely requeue
// the current item. The value of Err will be returned from Pop.
@@ -82,9 +82,12 @@ type Queue interface {
// Pop is helper function for popping from Queue.
// WARNING: Do NOT use this function in non-test code to avoid races
// unless you really really really really know what you are doing.
//
// NOTE: This function is deprecated and may be removed in the future without
// additional warning.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}) error {
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
return nil
})
@@ -149,6 +152,10 @@ func (f *FIFO) Close() {
func (f *FIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *FIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
@@ -287,6 +294,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
@@ -298,7 +306,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
continue
}
delete(f.items, id)
err := process(item)
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err

View File

@@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"k8s.io/apimachinery/pkg/types"
)
// ObjectName is a reference to an object of some implicit kind
type ObjectName struct {
Namespace string
Name string
}
// NewObjectName constructs a new one
func NewObjectName(namespace, name string) ObjectName {
return ObjectName{Namespace: namespace, Name: name}
}
// Parts is the inverse of the constructor
func (objName ObjectName) Parts() (namespace, name string) {
return objName.Namespace, objName.Name
}
// String returns the standard string encoding,
// which is designed to match the historical behavior of MetaNamespaceKeyFunc.
// Note this behavior is different from the String method of types.NamespacedName.
func (objName ObjectName) String() string {
if len(objName.Namespace) > 0 {
return objName.Namespace + "/" + objName.Name
}
return objName.Name
}
// ParseObjectName tries to parse the standard encoding
func ParseObjectName(str string) (ObjectName, error) {
var objName ObjectName
var err error
objName.Namespace, objName.Name, err = SplitMetaNamespaceKey(str)
return objName, err
}
// NamespacedNameAsObjectName rebrands the given NamespacedName as an ObjectName
func NamespacedNameAsObjectName(nn types.NamespacedName) ObjectName {
return NewObjectName(nn.Namespace, nn.Name)
}
// AsNamespacedName rebrands as a NamespacedName
func (objName ObjectName) AsNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: objName.Namespace, Name: objName.Name}
}

View File

@@ -22,7 +22,9 @@ import (
"fmt"
"io"
"math/rand"
"os"
"reflect"
"strings"
"sync"
"time"
@@ -40,6 +42,7 @@ import (
"k8s.io/client-go/tools/pager"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
"k8s.io/utils/trace"
)
@@ -49,12 +52,11 @@ const defaultExpectedTypeName = "<unspecified>"
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
typeDescription string
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
@@ -66,17 +68,9 @@ type Reflector struct {
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch.
initConnBackoffManager wait.BackoffManager
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time.Duration
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool
resyncPeriod time.Duration
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
@@ -91,6 +85,8 @@ type Reflector struct {
isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
@@ -99,8 +95,19 @@ type Reflector struct {
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time.Duration
// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
// Streaming has the primary advantage of using fewer server's resources to fetch data.
//
// The old behaviour establishes a LIST request which gets data in chunks.
// Paginated list is less efficient and depending on the actual size of objects
// might result in an increased memory consumption of the APIServer.
//
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
UseWatchList bool
}
// ResourceVersionUpdater is an interface that allows store implementation to
@@ -131,13 +138,13 @@ func DefaultWatchErrorHandler(r *Reflector, err error) {
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
}
}
@@ -155,7 +162,40 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
return indexer, reflector
}
// NewReflector creates a new Reflector object which will keep the
// NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
// that is outside this package. See NewReflectorWithOptions for further information.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}
// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
// information.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}
// ReflectorOptions configures a Reflector.
type ReflectorOptions struct {
// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
// in the call stack that is outside this package.
Name string
// TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
// using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
// "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
// are set, the type description is the string encoding of those. Otherwise, the type description is set to the
// go type of expectedType..
TypeDescription string
// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
// (do not resync).
ResyncPeriod time.Duration
// Clock allows tests to control time. If unset defaults to clock.RealClock{}
Clock clock.Clock
}
// NewReflectorWithOptions creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
@@ -165,49 +205,77 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
reflectorClock := options.Clock
if reflectorClock == nil {
reflectorClock = clock.RealClock{}
}
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
name: options.Name,
resyncPeriod: options.ResyncPeriod,
typeDescription: options.TypeDescription,
listerWatcher: lw,
store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
clock: reflectorClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType),
}
r.setExpectedType(expectedType)
if r.name == "" {
r.name = naming.GetNameFromCallsite(internalPackages...)
}
if r.typeDescription == "" {
r.typeDescription = getTypeDescriptionFromObject(expectedType)
}
if r.expectedGVK == nil {
r.expectedGVK = getExpectedGVKFromObject(expectedType)
}
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
r.UseWatchList = true
}
return r
}
func (r *Reflector) setExpectedType(expectedType interface{}) {
r.expectedType = reflect.TypeOf(expectedType)
if r.expectedType == nil {
r.expectedTypeName = defaultExpectedTypeName
return
func getTypeDescriptionFromObject(expectedType interface{}) string {
if expectedType == nil {
return defaultExpectedTypeName
}
r.expectedTypeName = r.expectedType.String()
reflectDescription := reflect.TypeOf(expectedType).String()
if obj, ok := expectedType.(*unstructured.Unstructured); ok {
// Use gvk to check that watch event objects are of the desired type.
gvk := obj.GroupVersionKind()
if gvk.Empty() {
klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
return
}
r.expectedGVK = &gvk
r.expectedTypeName = gvk.String()
obj, ok := expectedType.(*unstructured.Unstructured)
if !ok {
return reflectDescription
}
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return reflectDescription
}
return gvk.String()
}
func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
obj, ok := expectedType.(*unstructured.Unstructured)
if !ok {
return nil
}
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return nil
}
return &gvk
}
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
@@ -218,13 +286,13 @@ var internalPackages = []string{"client-go/tools/cache/"}
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}
var (
@@ -254,42 +322,75 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
fallbackToList := !r.UseWatchList
err := r.list(stopCh)
if err != nil {
return err
if r.UseWatchList {
w, err = r.watchList(stopCh)
if w == nil && err == nil {
// stopCh was closed
return nil
}
if err != nil {
if !apierrors.IsInvalid(err) {
return err
}
klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic")
fallbackToList = true
// Ensure that we won't accidentally pass some garbage down the watch.
w = nil
}
}
if fallbackToList {
err = r.list(stopCh)
if err != nil {
return err
}
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
}
// startResync periodically calls r.store.Resync() method.
// Note that this method is blocking and should be
// called in a separate goroutine.
func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}
// watch simply starts a watch request with the server.
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
@@ -298,35 +399,41 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
if err != nil {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
if w == nil {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
// We want to avoid situations of hanging watchers. Stop any watchers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
w, err = r.listerWatcher.Watch(options)
if err != nil {
if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
continue
}
}
return err
}
return err
}
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
// Ensure that watch will not be reused across iterations.
w.Stop()
w = nil
retry.After(err)
if err != nil {
if err != errorStopRequested {
@@ -335,16 +442,20 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
<-r.initConnBackoffManager.Backoff().C()
continue
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
continue
}
case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err)
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
}
}
return nil
@@ -399,7 +510,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
pager.PageSize = 0
}
list, paginatedResult, err = pager.List(context.Background(), options)
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
@@ -408,7 +519,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
@@ -421,8 +532,8 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
}
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err)
klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
@@ -446,7 +557,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
items, err := meta.ExtractListWithAlloc(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
@@ -460,6 +571,114 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
return nil
}
// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
var w watch.Interface
var err error
var temporaryStore Store
var resourceVersion string
// TODO(#115478): see if this function could be turned
// into a method and see if error handling
// could be unified with the r.watch method
isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
<-r.backoffManager.Backoff().C()
return true
}
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
// we tried to re-establish a watch request but the provided RV
// has either expired or it is greater than the server knows about.
// In that case we reset the RV and
// try to get a consistent snapshot from the watch cache (case 1)
r.setIsLastSyncResourceVersionUnavailable(true)
return true
}
return false
}
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
for {
select {
case <-stopCh:
return nil, nil
default:
}
resourceVersion = ""
lastKnownRV := r.rewatchResourceVersion()
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
// TODO(#115478): large "list", slow clients, slow network, p&f
// might slow down streaming and eventually fail.
// maybe in such a case we should retry with an increased timeout?
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: lastKnownRV,
AllowWatchBookmarks: true,
SendInitialEvents: pointer.Bool(true),
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: &timeoutSeconds,
}
start := r.clock.Now()
w, err = r.listerWatcher.Watch(options)
if err != nil {
if isErrorRetriableWithSideEffectsFn(err) {
continue
}
return nil, err
}
bookmarkReceived := pointer.Bool(false)
err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv },
bookmarkReceived,
r.clock, make(chan error), stopCh)
if err != nil {
w.Stop() // stop and retry with clean state
if err == errorStopRequested {
return nil, nil
}
if isErrorRetriableWithSideEffectsFn(err) {
continue
}
return nil, err
}
if *bookmarkReceived {
break
}
}
// We successfully got initial state from watch-list confirmed by the
// "k8s.io/initial-events-end" bookmark.
initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
r.setIsLastSyncResourceVersionUnavailable(false)
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
return w, nil
}
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
@@ -478,15 +697,17 @@ func watchHandler(start time.Time,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnInitialEventsEndBookmark *bool,
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
if exitOnInitialEventsEndBookmark != nil {
// set it to false just in case somebody
// made it positive
*exitOnInitialEventsEndBookmark = false
}
loop:
for {
@@ -541,6 +762,11 @@ loop:
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
@@ -549,6 +775,11 @@ loop:
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
watchDuration := clock.Since(start)
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
return nil
}
}
}
@@ -597,6 +828,18 @@ func (r *Reflector) relistResourceVersion() string {
return r.lastSyncResourceVersion
}
// rewatchResourceVersion determines the resource version the reflector should start streaming from.
func (r *Reflector) rewatchResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionUnavailable {
// initial stream should return data at the most recent resource version.
// the returned data must be consistent i.e. as if served from etcd via a quorum read
return ""
}
return r.lastSyncResourceVersion
}
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// "expired" or "too large resource version" error.
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
@@ -635,5 +878,25 @@ func isTooLargeResourceVersionError(err error) bool {
return true
}
}
// Matches the message returned by api server before 1.17.0
if strings.Contains(apierr.Status().Message, "Too large resource version") {
return true
}
return false
}
// isWatchErrorRetriable determines if it is safe to retry
// a watch error retrieved from the server.
func isWatchErrorRetriable(err error) bool {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
return true
}
return false
}

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/utils/buffer"
"k8s.io/utils/clock"
@@ -132,11 +133,13 @@ import (
// state, except that its ResourceVersion is replaced with a
// ResourceVersion in which the object is actually absent.
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
// It returns a registration handle for the handler that can be used to remove
// the handler again.
// AddEventHandler adds an event handler to the shared informer using
// the shared informer's resync period. Events to a single handler are
// delivered sequentially, but there is no coordination between
// different handlers.
// It returns a registration handle for the handler that can be used to
// remove the handler again, or to tell if the handler is synced (has
// seen every item in the initial list).
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means
@@ -169,6 +172,10 @@ type SharedInformer interface {
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
//
// Note that this doesn't tell you if an individual handler is synced!!
// For that, please call HasSynced on the handle returned by
// AddEventHandler.
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
@@ -198,10 +205,7 @@ type SharedInformer interface {
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
// Please see the comment on TransformFunc for more details.
SetTransform(handler TransformFunc) error
// IsStopped reports whether the informer has already been stopped.
@@ -213,7 +217,14 @@ type SharedInformer interface {
// Opaque interface representing the registration of ResourceEventHandler for
// a SharedInformer. Must be supplied back to the same SharedInformer's
// `RemoveEventHandler` to unregister the handlers.
type ResourceEventHandlerRegistration interface{}
//
// Also used to tell if the handler is synced (has had all items in the initial
// list delivered).
type ResourceEventHandlerRegistration interface {
// HasSynced reports if both the parent has synced and all pre-sync
// events have been delivered.
HasSynced() bool
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
@@ -223,14 +234,26 @@ type SharedIndexInformer interface {
GetIndexer() Indexer
}
// NewSharedInformer creates a new instance for the listwatcher.
// NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}
// NewSharedIndexInformer creates a new instance for the listwatcher.
// The created informer will not do resyncs if the given
// defaultEventHandlerResyncPeriod is zero. Otherwise: for each
// NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
// NewSharedIndexInformerWithOptions for full details.
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
return NewSharedIndexInformerWithOptions(
lw,
exampleObject,
SharedIndexInformerOptions{
ResyncPeriod: defaultEventHandlerResyncPeriod,
Indexers: indexers,
},
)
}
// NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
// The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each
// handler that with a non-zero requested resync period, whether added
// before or after the informer starts, the nominal resync period is
// the requested resync period rounded up to a multiple of the
@@ -238,21 +261,36 @@ func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEv
// checking period is established when the informer starts running,
// and is the maximum of (a) the minimum of the resync periods
// requested before the informer starts and the
// defaultEventHandlerResyncPeriod given here and (b) the constant
// options.ResyncPeriod given here and (b) the constant
// `minimumResyncPeriod` defined in this file.
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
objectDescription: options.ObjectDescription,
resyncCheckPeriod: options.ResyncPeriod,
defaultEventHandlerResyncPeriod: options.ResyncPeriod,
clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
}
return sharedIndexInformer
}
// SharedIndexInformerOptions configures a sharedIndexInformer.
type SharedIndexInformerOptions struct {
// ResyncPeriod is the default event handler resync period and resync check
// period. If unset/unspecified, these are defaulted to 0 (do not resync).
ResyncPeriod time.Duration
// Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
Indexers Indexers
// ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
// underlying Reflector's type description.
ObjectDescription string
}
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
@@ -326,12 +364,13 @@ type sharedIndexInformer struct {
listerWatcher ListerWatcher
// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
// objectType is an example object of the type this informer is expected to handle. If set, an event
// with an object with a mismatching type is dropped instead of being delivered to listeners.
objectType runtime.Object
// objectDescription is the description of this informer's objects. This typically defaults to
objectDescription string
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
@@ -381,7 +420,8 @@ type updateNotification struct {
}
type addNotification struct {
newObj interface{}
newObj interface{}
isInInitialList bool
}
type deleteNotification struct {
@@ -419,27 +459,30 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
return
}
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
ObjectDescription: s.objectDescription,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
@@ -559,7 +602,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
if !s.started {
return s.processor.addListener(listener), nil
@@ -575,27 +618,35 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
handle := s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
// Note that we enqueue these notifications with the lock held
// and before returning the handle. That means there is never a
// chance for anyone to call the handle's HasSynced method in a
// state when it would falsely return true (i.e., when the
// shared informer is synced but it has not observed an Add
// with isInitialList being true, nor when the thread
// processing notifications somehow goes faster than this
// thread adding them and the counter is temporarily zero).
listener.add(addNotification{newObj: item, isInInitialList: true})
}
return handle, nil
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
return processDeltas(s, s.indexer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}
// Conforms to ResourceEventHandler
@@ -817,6 +868,8 @@ type processorListener struct {
handler ResourceEventHandler
syncTracker *synctrack.SingleFileTracker
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
@@ -847,11 +900,18 @@ type processorListener struct {
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
// HasSynced returns true if the source informer has synced, and all
// corresponding events have been delivered.
func (p *processorListener) HasSynced() bool {
return p.syncTracker.HasSynced()
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
@@ -863,6 +923,9 @@ func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, res
}
func (p *processorListener) add(notification interface{}) {
if a, ok := notification.(addNotification); ok && a.isInInitialList {
p.syncTracker.Start()
}
p.addCh <- notification
}
@@ -908,7 +971,10 @@ func (p *processorListener) run() {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
p.handler.OnAdd(notification.newObj, notification.isInInitialList)
if notification.isInInitialList {
p.syncTracker.Finished()
}
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:

View File

@@ -21,6 +21,7 @@ import (
"strings"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Store is a generic object storage and processing interface. A
@@ -99,20 +100,38 @@ type ExplicitKey string
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
//
// TODO: replace key-as-string with a key-as-struct so that this
// packing/unpacking won't be necessary.
// Clients that want a structured alternative can use ObjectToName or MetaObjectToName.
// Note: this would not be a client that wants a key for a Store because those are
// necessarily strings.
//
// TODO maybe some day?: change Store to be keyed differently
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
objName, err := ObjectToName(obj)
if err != nil {
return "", err
}
return objName.String(), nil
}
// ObjectToName returns the structured name for the given object,
// if indeed it can be viewed as a metav1.Object.
func ObjectToName(obj interface{}) (ObjectName, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
return ObjectName{}, fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
return MetaObjectToName(meta), nil
}
// MetaObjectToName returns the structured name for the given object
func MetaObjectToName(obj metav1.Object) ObjectName {
if len(obj.GetNamespace()) > 0 {
return ObjectName{Namespace: obj.GetNamespace(), Name: obj.GetName()}
}
return meta.GetName(), nil
return ObjectName{Namespace: "", Name: obj.GetName()}
}
// SplitMetaNamespaceKey returns the namespace and name that

View File

@@ -0,0 +1,83 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package synctrack
import (
"sync"
"sync/atomic"
)
// Lazy defers the computation of `Evaluate` to when it is necessary. It is
// possible that Evaluate will be called in parallel from multiple goroutines.
type Lazy[T any] struct {
Evaluate func() (T, error)
cache atomic.Pointer[cacheEntry[T]]
}
type cacheEntry[T any] struct {
eval func() (T, error)
lock sync.RWMutex
result *T
}
func (e *cacheEntry[T]) get() (T, error) {
if cur := func() *T {
e.lock.RLock()
defer e.lock.RUnlock()
return e.result
}(); cur != nil {
return *cur, nil
}
e.lock.Lock()
defer e.lock.Unlock()
if e.result != nil {
return *e.result, nil
}
r, err := e.eval()
if err == nil {
e.result = &r
}
return r, err
}
func (z *Lazy[T]) newCacheEntry() *cacheEntry[T] {
return &cacheEntry[T]{eval: z.Evaluate}
}
// Notify should be called when something has changed necessitating a new call
// to Evaluate.
func (z *Lazy[T]) Notify() { z.cache.Swap(z.newCacheEntry()) }
// Get should be called to get the current result of a call to Evaluate. If the
// current cached value is stale (due to a call to Notify), then Evaluate will
// be called synchronously. If subsequent calls to Get happen (without another
// Notify), they will all wait for the same return value.
//
// Error returns are not cached and will cause multiple calls to evaluate!
func (z *Lazy[T]) Get() (T, error) {
e := z.cache.Load()
if e == nil {
// Since we don't force a constructor, nil is a possible value.
// If multiple Gets race to set this, the swap makes sure only
// one wins.
z.cache.CompareAndSwap(nil, z.newCacheEntry())
e = z.cache.Load()
}
return e.get()
}

View File

@@ -0,0 +1,120 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package synctrack contains utilities for helping controllers track whether
// they are "synced" or not, that is, whether they have processed all items
// from the informer's initial list.
package synctrack
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/util/sets"
)
// AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
type AsyncTracker[T comparable] struct {
UpstreamHasSynced func() bool
lock sync.Mutex
waiting sets.Set[T]
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *AsyncTracker[T]) Start(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting == nil {
t.waiting = sets.New[T](key)
} else {
t.waiting.Insert(key)
}
}
// Finished should be called when finished processing a key which was part of
// the initial list. Since keys are tracked individually, nothing bad happens
// if you call Finished without a corresponding call to Start. This makes it
// easier to use this in combination with e.g. queues which don't make it easy
// to plumb through the isInInitialList boolean.
func (t *AsyncTracker[T]) Finished(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting != nil {
t.waiting.Delete(key)
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we can't hold our lock while
// waiting on that or a user is likely to get a deadlock.
if !t.UpstreamHasSynced() {
return false
}
t.lock.Lock()
defer t.lock.Unlock()
return t.waiting.Len() == 0
}
// SingleFileTracker helps propagate HasSynced when events are processed in
// order (i.e. via a queue).
type SingleFileTracker struct {
// Important: count is used with atomic operations so it must be 64-bit
// aligned, otherwise atomic operations will panic. Having it at the top of
// the struct will guarantee that, even on 32-bit arches.
// See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information.
count int64
UpstreamHasSynced func() bool
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *SingleFileTracker) Start() {
atomic.AddInt64(&t.count, 1)
}
// Finished should be called when finished processing a key which was part of
// the initial list. You must never call Finished() before (or without) its
// corresponding Start(), that is a logic error that could cause HasSynced to
// return a wrong value. To help you notice this should it happen, Finished()
// will panic if the internal counter goes negative.
func (t *SingleFileTracker) Finished() {
result := atomic.AddInt64(&t.count, -1)
if result < 0 {
panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we don't want to then act on a
// stale count value.
if !t.UpstreamHasSynced() {
return false
}
return atomic.LoadInt64(&t.count) <= 0
}

View File

@@ -67,7 +67,7 @@ type Preferences struct {
type Cluster struct {
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
// +k8s:conversion-gen=false
LocationOfOrigin string
LocationOfOrigin string `json:"-"`
// Server is the address of the kubernetes cluster (https://hostname:port).
Server string `json:"server"`
// TLSServerName is used to check server certificate. If TLSServerName is empty, the hostname used to contact the server is used.
@@ -107,7 +107,7 @@ type Cluster struct {
type AuthInfo struct {
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
// +k8s:conversion-gen=false
LocationOfOrigin string
LocationOfOrigin string `json:"-"`
// ClientCertificate is the path to a client cert file for TLS.
// +optional
ClientCertificate string `json:"client-certificate,omitempty"`
@@ -159,7 +159,7 @@ type AuthInfo struct {
type Context struct {
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
// +k8s:conversion-gen=false
LocationOfOrigin string
LocationOfOrigin string `json:"-"`
// Cluster is the name of the cluster for this context
Cluster string `json:"cluster"`
// AuthInfo is the name of the authInfo for this context
@@ -252,7 +252,7 @@ type ExecConfig struct {
// recommended as one of the prime benefits of exec plugins is that no secrets need
// to be stored directly in the kubeconfig.
// +k8s:conversion-gen=false
Config runtime.Object
Config runtime.Object `json:"-"`
// InteractiveMode determines this plugin's relationship with standard input. Valid
// values are "Never" (this exec plugin never uses standard input), "IfAvailable" (this
@@ -264,7 +264,7 @@ type ExecConfig struct {
// client.authentication.k8s.io/v1beta1, then this field is optional and defaults
// to "IfAvailable" when unset. Otherwise, this field is required.
// +optional
InteractiveMode ExecInteractiveMode
InteractiveMode ExecInteractiveMode `json:"interactiveMode,omitempty"`
// StdinUnavailable indicates whether the exec authenticator can pass standard
// input through to this exec plugin. For example, a higher level entity might be using
@@ -272,14 +272,14 @@ type ExecConfig struct {
// plugin to use standard input. This is kept here in order to keep all of the exec configuration
// together, but it is never serialized.
// +k8s:conversion-gen=false
StdinUnavailable bool
StdinUnavailable bool `json:"-"`
// StdinUnavailableMessage is an optional message to be displayed when the exec authenticator
// cannot successfully run this exec plugin because it needs to use standard input and
// StdinUnavailable is true. For example, a process that is already using standard input to
// read user instructions might set this to "used by my-program to read user instructions".
// +k8s:conversion-gen=false
StdinUnavailableMessage string
StdinUnavailableMessage string `json:"-"`
}
var _ fmt.Stringer = new(ExecConfig)

View File

@@ -42,6 +42,10 @@ type LatencyMetric interface {
Observe(ctx context.Context, verb string, u url.URL, latency time.Duration)
}
type ResolverLatencyMetric interface {
Observe(ctx context.Context, host string, latency time.Duration)
}
// SizeMetric observes client response size partitioned by verb and host.
type SizeMetric interface {
Observe(ctx context.Context, verb string, host string, size float64)
@@ -58,6 +62,23 @@ type CallsMetric interface {
Increment(exitCode int, callStatus string)
}
// RetryMetric counts the number of retries sent to the server
// partitioned by code, method, and host.
type RetryMetric interface {
IncrementRetry(ctx context.Context, code string, method string, host string)
}
// TransportCacheMetric shows the number of entries in the internal transport cache
type TransportCacheMetric interface {
Observe(value int)
}
// TransportCreateCallsMetric counts the number of times a transport is created
// partitioned by the result of the cache: hit, miss, uncacheable
type TransportCreateCallsMetric interface {
Increment(result string)
}
var (
// ClientCertExpiry is the expiry time of a client certificate
ClientCertExpiry ExpiryMetric = noopExpiry{}
@@ -65,6 +86,8 @@ var (
ClientCertRotationAge DurationMetric = noopDuration{}
// RequestLatency is the latency metric that rest clients will update.
RequestLatency LatencyMetric = noopLatency{}
// ResolverLatency is the latency metric that DNS resolver will update
ResolverLatency ResolverLatencyMetric = noopResolverLatency{}
// RequestSize is the request size metric that rest clients will update.
RequestSize SizeMetric = noopSize{}
// ResponseSize is the response size metric that rest clients will update.
@@ -76,6 +99,15 @@ var (
// ExecPluginCalls is the number of calls made to an exec plugin, partitioned by
// exit code and call status.
ExecPluginCalls CallsMetric = noopCalls{}
// RequestRetry is the retry metric that tracks the number of
// retries sent to the server.
RequestRetry RetryMetric = noopRetry{}
// TransportCacheEntries is the metric that tracks the number of entries in the
// internal transport cache.
TransportCacheEntries TransportCacheMetric = noopTransportCache{}
// TransportCreateCalls is the metric that counts the number of times a new transport
// is created
TransportCreateCalls TransportCreateCallsMetric = noopTransportCreateCalls{}
)
// RegisterOpts contains all the metrics to register. Metrics may be nil.
@@ -83,11 +115,15 @@ type RegisterOpts struct {
ClientCertExpiry ExpiryMetric
ClientCertRotationAge DurationMetric
RequestLatency LatencyMetric
ResolverLatency ResolverLatencyMetric
RequestSize SizeMetric
ResponseSize SizeMetric
RateLimiterLatency LatencyMetric
RequestResult ResultMetric
ExecPluginCalls CallsMetric
RequestRetry RetryMetric
TransportCacheEntries TransportCacheMetric
TransportCreateCalls TransportCreateCallsMetric
}
// Register registers metrics for the rest client to use. This can
@@ -103,6 +139,9 @@ func Register(opts RegisterOpts) {
if opts.RequestLatency != nil {
RequestLatency = opts.RequestLatency
}
if opts.ResolverLatency != nil {
ResolverLatency = opts.ResolverLatency
}
if opts.RequestSize != nil {
RequestSize = opts.RequestSize
}
@@ -118,6 +157,15 @@ func Register(opts RegisterOpts) {
if opts.ExecPluginCalls != nil {
ExecPluginCalls = opts.ExecPluginCalls
}
if opts.RequestRetry != nil {
RequestRetry = opts.RequestRetry
}
if opts.TransportCacheEntries != nil {
TransportCacheEntries = opts.TransportCacheEntries
}
if opts.TransportCreateCalls != nil {
TransportCreateCalls = opts.TransportCreateCalls
}
})
}
@@ -133,6 +181,11 @@ type noopLatency struct{}
func (noopLatency) Observe(context.Context, string, url.URL, time.Duration) {}
type noopResolverLatency struct{}
func (n noopResolverLatency) Observe(ctx context.Context, host string, latency time.Duration) {
}
type noopSize struct{}
func (noopSize) Observe(context.Context, string, string, float64) {}
@@ -144,3 +197,15 @@ func (noopResult) Increment(context.Context, string, string, string) {}
type noopCalls struct{}
func (noopCalls) Increment(int, string) {}
type noopRetry struct{}
func (noopRetry) IncrementRetry(context.Context, string, string, string) {}
type noopTransportCache struct{}
func (noopTransportCache) Observe(int) {}
type noopTransportCreateCalls struct{}
func (noopTransportCreateCalls) Increment(string) {}

View File

@@ -73,7 +73,23 @@ func New(fn ListPageFunc) *ListPager {
// List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead. The Limit field on options, if unset, will default to the page size.
//
// If items in the returned list are retained for different durations, and you want to avoid
// retaining the whole slice returned by p.PageFn as long as any item is referenced,
// use ListWithAlloc instead.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
return p.list(ctx, options, false)
}
// ListWithAlloc works like List, but avoids retaining references to the items slice returned by p.PageFn.
// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
//
// If the items in the returned list are not retained, or are retained for the same duration, use List instead for memory efficiency.
func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
return p.list(ctx, options, true)
}
func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) {
if options.Limit == 0 {
options.Limit = p.PageSize
}
@@ -123,7 +139,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
list.ResourceVersion = m.GetResourceVersion()
list.SelfLink = m.GetSelfLink()
}
if err := meta.EachListItem(obj, func(obj runtime.Object) error {
eachListItemFunc := meta.EachListItem
if allocNew {
eachListItemFunc = meta.EachListItemWithAlloc
}
if err := eachListItemFunc(obj, func(obj runtime.Object) error {
list.Items = append(list.Items, obj)
return nil
}); err != nil {
@@ -156,12 +176,26 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
//
// Items are retrieved in chunks from the server to reduce the impact on the server with up to
// ListPager.PageBufferSize chunks buffered concurrently in the background.
//
// If items passed to fn are retained for different durations, and you want to avoid
// retaining the whole slice returned by p.PageFn as long as any item is referenced,
// use EachListItemWithAlloc instead.
func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
return meta.EachListItem(obj, fn)
})
}
// EachListItemWithAlloc works like EachListItem, but avoids retaining references to the items slice returned by p.PageFn.
// It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
//
// If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency.
func (p *ListPager) EachListItemWithAlloc(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
return meta.EachListItemWithAlloc(obj, fn)
})
}
// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
// not return an error, any error encountered while retrieving the list from the server is

View File

@@ -27,6 +27,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/metrics"
)
// TlsTransportCache caches TLS http.RoundTrippers different configurations. The
@@ -80,11 +81,16 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
// Ensure we only create a single transport for the given TLS options
c.mu.Lock()
defer c.mu.Unlock()
defer metrics.TransportCacheEntries.Observe(len(c.transports))
// See if we already have a custom transport for this config
if t, ok := c.transports[key]; ok {
metrics.TransportCreateCalls.Increment("hit")
return t, nil
}
metrics.TransportCreateCalls.Increment("miss")
} else {
metrics.TransportCreateCalls.Increment("uncacheable")
}
// Get the TLS options for this client config
@@ -109,7 +115,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
// If we use are reloading files, we need to handle certificate rotation properly
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
if config.TLS.ReloadTLSFiles {
if config.TLS.ReloadTLSFiles && tlsConfig != nil && tlsConfig.GetClientCertificate != nil {
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
dial = dynamicCertDialer.connDialer.DialContext

View File

@@ -25,6 +25,7 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math"
"math/big"
"net"
"os"
@@ -44,6 +45,7 @@ type Config struct {
Organization []string
AltNames AltNames
Usages []x509.ExtKeyUsage
NotBefore time.Time
}
// AltNames contains the domain names and IP addresses that will be added
@@ -57,14 +59,24 @@ type AltNames struct {
// NewSelfSignedCACert creates a CA certificate
func NewSelfSignedCACert(cfg Config, key crypto.Signer) (*x509.Certificate, error) {
now := time.Now()
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
notBefore := now.UTC()
if !cfg.NotBefore.IsZero() {
notBefore = cfg.NotBefore.UTC()
}
tmpl := x509.Certificate{
SerialNumber: new(big.Int).SetInt64(0),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
},
DNSNames: []string{cfg.CommonName},
NotBefore: now.UTC(),
NotBefore: notBefore,
NotAfter: now.Add(duration365d * 10).UTC(),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
@@ -116,9 +128,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
caTemplate := x509.Certificate{
SerialNumber: big.NewInt(1),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
},
@@ -144,9 +161,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err = cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
template := x509.Certificate{
SerialNumber: big.NewInt(2),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},
@@ -191,7 +213,7 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err := os.WriteFile(certFixturePath, certBuffer.Bytes(), 0644); err != nil {
return nil, nil, fmt.Errorf("failed to write cert fixture to %s: %v", certFixturePath, err)
}
if err := os.WriteFile(keyFixturePath, keyBuffer.Bytes(), 0644); err != nil {
if err := os.WriteFile(keyFixturePath, keyBuffer.Bytes(), 0600); err != nil {
return nil, nil, fmt.Errorf("failed to write key fixture to %s: %v", certFixturePath, err)
}
}

View File

@@ -33,38 +33,81 @@ type DelayingInterface interface {
AddAfter(item interface{}, duration time.Duration)
}
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
type DelayingQueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock optionally allows injecting a real or fake clock for testing purposes.
Clock clock.WithTicker
// Queue optionally allows injecting custom queue Interface instead of the default one.
Queue Interface
}
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedDelayingQueue instead.
// NewDelayingQueueWithConfig instead and specify a name.
func NewDelayingQueue() DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}
// NewDelayingQueueWithConfig constructs a new workqueue with options to
// customize different properties.
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.Queue == nil {
config.Queue = NewWithConfig(QueueConfig{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
})
}
return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
}
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
// inject custom queue Interface instead of the default one
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
return newDelayingQueue(clock.RealClock{}, q, name)
return NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: name,
Queue: q,
})
}
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewNamedDelayingQueue(name string) DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
}
// NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes
// with ability to inject real or fake clock for testing purposes.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
return newDelayingQueue(clock, NewNamed(name), name)
return NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: name,
Clock: clock,
})
}
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
metrics: newRetryMetrics(name, provider),
}
go ret.waitingLoop()

View File

@@ -244,13 +244,18 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
}
}
func newRetryMetrics(name string) retryMetrics {
func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
if provider == nil {
provider = globalMetricsFactory.metricsProvider
}
return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
retries: provider.NewRetriesMetric(name),
}
}

View File

@@ -33,17 +33,60 @@ type Interface interface {
ShuttingDown() bool
}
// New constructs a new work queue (see the package comment).
func New() *Type {
return NewNamed("")
// QueueConfig specifies optional configurations to customize an Interface.
type QueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock ability to inject real or fake clock for testing purposes.
Clock clock.WithTicker
}
// New constructs a new work queue (see the package comment).
func New() *Type {
return NewWithConfig(QueueConfig{
Name: "",
})
}
// NewWithConfig constructs a new workqueue with ability to
// customize different properties.
func NewWithConfig(config QueueConfig) *Type {
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}
// NewNamed creates a new named queue.
// Deprecated: Use NewWithConfig instead.
func NewNamed(name string) *Type {
rc := clock.RealClock{}
return NewWithConfig(QueueConfig{
Name: name,
})
}
// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
var metricsFactory *queueMetricsFactory
if config.MetricsProvider != nil {
metricsFactory = &queueMetricsFactory{
metricsProvider: config.MetricsProvider,
}
} else {
metricsFactory = &globalMetricsFactory
}
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
return newQueue(
rc,
globalMetricsFactory.newQueueMetrics(name, rc),
defaultUnfinishedWorkUpdatePeriod,
config.Clock,
metricsFactory.newQueueMetrics(config.Name, config.Clock),
updatePeriod,
)
}

View File

@@ -16,6 +16,8 @@ limitations under the License.
package workqueue
import "k8s.io/utils/clock"
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
@@ -32,29 +34,68 @@ type RateLimitingInterface interface {
NumRequeues(item interface{}) int
}
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
type RateLimitingQueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock optionally allows injecting a real or fake clock for testing purposes.
Clock clock.WithTicker
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
DelayingQueue DelayingInterface
}
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
// Remember to call Forget! If you don't, you may end up tracking failures forever.
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedRateLimitingQueue instead.
// NewRateLimitingQueueWithConfig instead and specify a name.
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
}
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
// with options to customize different properties.
// Remember to call Forget! If you don't, you may end up tracking failures forever.
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.DelayingQueue == nil {
config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
})
}
return &rateLimitingType{
DelayingInterface: NewDelayingQueue(),
DelayingInterface: config.DelayingQueue,
rateLimiter: rateLimiter,
}
}
// NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability.
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewNamedDelayingQueue(name),
rateLimiter: rateLimiter,
}
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
Name: name,
})
}
// NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability
// with the option to inject a custom delaying queue instead of the default one.
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: di,
rateLimiter: rateLimiter,
}
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
DelayingQueue: di,
})
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing