Update vendor files to point to kubernetes-1.12.0-beta.1
This commit is contained in:
83
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
83
vendor/k8s.io/client-go/tools/cache/listwatch.go
generated
vendored
@@ -18,13 +18,10 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
@@ -93,13 +90,6 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
|
||||
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
|
||||
}
|
||||
|
||||
func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
|
||||
if options.TimeoutSeconds != nil {
|
||||
return time.Duration(*options.TimeoutSeconds) * time.Second
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// List a set of apiserver resources
|
||||
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||
if !lw.DisableChunking {
|
||||
@@ -112,76 +102,3 @@ func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return lw.WatchFunc(options)
|
||||
}
|
||||
|
||||
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
||||
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
|
||||
if len(conditions) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
list, err := lw.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
initialItems, err := meta.ExtractList(list)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// use the initial items as simulated "adds"
|
||||
var lastEvent *watch.Event
|
||||
currIndex := 0
|
||||
passedConditions := 0
|
||||
for _, condition := range conditions {
|
||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||
if lastEvent != nil {
|
||||
done, err := condition(*lastEvent)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
}
|
||||
if done {
|
||||
passedConditions = passedConditions + 1
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
ConditionSucceeded:
|
||||
for currIndex < len(initialItems) {
|
||||
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
|
||||
currIndex++
|
||||
|
||||
done, err := condition(*lastEvent)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
}
|
||||
if done {
|
||||
passedConditions = passedConditions + 1
|
||||
break ConditionSucceeded
|
||||
}
|
||||
}
|
||||
}
|
||||
if passedConditions == len(conditions) {
|
||||
return lastEvent, nil
|
||||
}
|
||||
remainingConditions := conditions[passedConditions:]
|
||||
|
||||
metaObj, err := meta.ListAccessor(list)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currResourceVersion := metaObj.GetResourceVersion()
|
||||
|
||||
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
|
||||
if err == watch.ErrWatchClosed {
|
||||
// present a consistent error interface to callers
|
||||
err = wait.ErrWaitTimeout
|
||||
}
|
||||
return evt, err
|
||||
}
|
||||
|
77
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
77
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
@@ -24,9 +24,6 @@ import (
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"regexp"
|
||||
goruntime "runtime"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -40,6 +37,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/naming"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
@@ -76,8 +74,6 @@ type Reflector struct {
|
||||
var (
|
||||
// We try to spread the load on apiserver by setting timeouts for
|
||||
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
|
||||
// However, it can be modified to avoid periodic resync to break the
|
||||
// TCP connection.
|
||||
minWatchTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
@@ -96,7 +92,7 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
|
||||
// resyncPeriod, so that you can use reflectors to periodically process everything as
|
||||
// well as incrementally processing the things that change.
|
||||
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
|
||||
return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
|
||||
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
|
||||
}
|
||||
|
||||
// reflectorDisambiguator is used to disambiguate started reflectors.
|
||||
@@ -127,74 +123,7 @@ func makeValidPrometheusMetricLabel(in string) string {
|
||||
|
||||
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
||||
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
||||
var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
|
||||
|
||||
// getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages
|
||||
// it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging
|
||||
func getDefaultReflectorName(ignoredPackages ...string) string {
|
||||
name := "????"
|
||||
const maxStack = 10
|
||||
for i := 1; i < maxStack; i++ {
|
||||
_, file, line, ok := goruntime.Caller(i)
|
||||
if !ok {
|
||||
file, line, ok = extractStackCreator()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
i += maxStack
|
||||
}
|
||||
if hasPackage(file, ignoredPackages) {
|
||||
continue
|
||||
}
|
||||
|
||||
file = trimPackagePrefix(file)
|
||||
name = fmt.Sprintf("%s:%d", file, line)
|
||||
break
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
// hasPackage returns true if the file is in one of the ignored packages.
|
||||
func hasPackage(file string, ignoredPackages []string) bool {
|
||||
for _, ignoredPackage := range ignoredPackages {
|
||||
if strings.Contains(file, ignoredPackage) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// trimPackagePrefix reduces duplicate values off the front of a package name.
|
||||
func trimPackagePrefix(file string) string {
|
||||
if l := strings.LastIndex(file, "k8s.io/client-go/pkg/"); l >= 0 {
|
||||
return file[l+len("k8s.io/client-go/"):]
|
||||
}
|
||||
if l := strings.LastIndex(file, "/src/"); l >= 0 {
|
||||
return file[l+5:]
|
||||
}
|
||||
if l := strings.LastIndex(file, "/pkg/"); l >= 0 {
|
||||
return file[l+1:]
|
||||
}
|
||||
return file
|
||||
}
|
||||
|
||||
var stackCreator = regexp.MustCompile(`(?m)^created by (.*)\n\s+(.*):(\d+) \+0x[[:xdigit:]]+$`)
|
||||
|
||||
// extractStackCreator retrieves the goroutine file and line that launched this stack. Returns false
|
||||
// if the creator cannot be located.
|
||||
// TODO: Go does not expose this via runtime https://github.com/golang/go/issues/11440
|
||||
func extractStackCreator() (string, int, bool) {
|
||||
stack := debug.Stack()
|
||||
matches := stackCreator.FindStringSubmatch(string(stack))
|
||||
if matches == nil || len(matches) != 4 {
|
||||
return "", 0, false
|
||||
}
|
||||
line, err := strconv.Atoi(matches[3])
|
||||
if err != nil {
|
||||
return "", 0, false
|
||||
}
|
||||
return matches[2], line, true
|
||||
}
|
||||
var internalPackages = []string{"client-go/tools/cache/"}
|
||||
|
||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||
// Run will exit when stopCh is closed.
|
||||
|
9
vendor/k8s.io/client-go/tools/clientcmd/api/helpers.go
generated
vendored
9
vendor/k8s.io/client-go/tools/clientcmd/api/helpers.go
generated
vendored
@@ -29,6 +29,8 @@ import (
|
||||
func init() {
|
||||
sDec, _ := base64.StdEncoding.DecodeString("REDACTED+")
|
||||
redactedBytes = []byte(string(sDec))
|
||||
sDec, _ = base64.StdEncoding.DecodeString("DATA+OMITTED")
|
||||
dataOmittedBytes = []byte(string(sDec))
|
||||
}
|
||||
|
||||
// IsConfigEmpty returns true if the config is empty.
|
||||
@@ -79,7 +81,10 @@ func MinifyConfig(config *Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var redactedBytes []byte
|
||||
var (
|
||||
redactedBytes []byte
|
||||
dataOmittedBytes []byte
|
||||
)
|
||||
|
||||
// Flatten redacts raw data entries from the config object for a human-readable view.
|
||||
func ShortenConfig(config *Config) {
|
||||
@@ -97,7 +102,7 @@ func ShortenConfig(config *Config) {
|
||||
}
|
||||
for key, cluster := range config.Clusters {
|
||||
if len(cluster.CertificateAuthorityData) > 0 {
|
||||
cluster.CertificateAuthorityData = redactedBytes
|
||||
cluster.CertificateAuthorityData = dataOmittedBytes
|
||||
}
|
||||
config.Clusters[key] = cluster
|
||||
}
|
||||
|
7
vendor/k8s.io/client-go/tools/clientcmd/api/helpers_test.go
generated
vendored
7
vendor/k8s.io/client-go/tools/clientcmd/api/helpers_test.go
generated
vendored
@@ -229,7 +229,7 @@ func Example_minifyAndShorten() {
|
||||
// clusters:
|
||||
// cow-cluster:
|
||||
// LocationOfOrigin: ""
|
||||
// certificate-authority-data: REDACTED
|
||||
// certificate-authority-data: DATA+OMITTED
|
||||
// server: http://cow.org:8080
|
||||
// contexts:
|
||||
// federal-context:
|
||||
@@ -276,14 +276,15 @@ func TestShortenSuccess(t *testing.T) {
|
||||
}
|
||||
|
||||
redacted := string(redactedBytes)
|
||||
dataOmitted := string(dataOmittedBytes)
|
||||
if len(mutatingConfig.Clusters) != 2 {
|
||||
t.Errorf("unexpected clusters: %v", mutatingConfig.Clusters)
|
||||
}
|
||||
if !reflect.DeepEqual(startingConfig.Clusters[unchangingCluster], mutatingConfig.Clusters[unchangingCluster]) {
|
||||
t.Errorf("expected %v, got %v", startingConfig.Clusters[unchangingCluster], mutatingConfig.Clusters[unchangingCluster])
|
||||
}
|
||||
if string(mutatingConfig.Clusters[changingCluster].CertificateAuthorityData) != redacted {
|
||||
t.Errorf("expected %v, got %v", redacted, string(mutatingConfig.Clusters[changingCluster].CertificateAuthorityData))
|
||||
if string(mutatingConfig.Clusters[changingCluster].CertificateAuthorityData) != dataOmitted {
|
||||
t.Errorf("expected %v, got %v", dataOmitted, string(mutatingConfig.Clusters[changingCluster].CertificateAuthorityData))
|
||||
}
|
||||
|
||||
if len(mutatingConfig.AuthInfos) != 2 {
|
||||
|
11
vendor/k8s.io/client-go/tools/clientcmd/api/latest/latest.go
generated
vendored
11
vendor/k8s.io/client-go/tools/clientcmd/api/latest/latest.go
generated
vendored
@@ -21,6 +21,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/json"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/tools/clientcmd/api"
|
||||
"k8s.io/client-go/tools/clientcmd/api/v1"
|
||||
)
|
||||
@@ -47,14 +48,8 @@ var (
|
||||
|
||||
func init() {
|
||||
Scheme = runtime.NewScheme()
|
||||
if err := api.AddToScheme(Scheme); err != nil {
|
||||
// Programmer error, detect immediately
|
||||
panic(err)
|
||||
}
|
||||
if err := v1.AddToScheme(Scheme); err != nil {
|
||||
// Programmer error, detect immediately
|
||||
panic(err)
|
||||
}
|
||||
utilruntime.Must(api.AddToScheme(Scheme))
|
||||
utilruntime.Must(v1.AddToScheme(Scheme))
|
||||
yamlSerializer := json.NewYAMLSerializer(json.DefaultMetaFactory, Scheme, Scheme)
|
||||
Codec = versioning.NewDefaultingCodecForScheme(
|
||||
Scheme,
|
||||
|
23
vendor/k8s.io/client-go/tools/clientcmd/api/v1/zz_generated.deepcopy.go
generated
vendored
23
vendor/k8s.io/client-go/tools/clientcmd/api/v1/zz_generated.deepcopy.go
generated
vendored
@@ -46,31 +46,26 @@ func (in *AuthInfo) DeepCopyInto(out *AuthInfo) {
|
||||
in, out := &in.ImpersonateUserExtra, &out.ImpersonateUserExtra
|
||||
*out = make(map[string][]string, len(*in))
|
||||
for key, val := range *in {
|
||||
var outVal []string
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = make([]string, len(val))
|
||||
copy((*out)[key], val)
|
||||
in, out := &val, &outVal
|
||||
*out = make([]string, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
(*out)[key] = outVal
|
||||
}
|
||||
}
|
||||
if in.AuthProvider != nil {
|
||||
in, out := &in.AuthProvider, &out.AuthProvider
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(AuthProviderConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
*out = new(AuthProviderConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Exec != nil {
|
||||
in, out := &in.Exec, &out.Exec
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(ExecConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
*out = new(ExecConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Extensions != nil {
|
||||
in, out := &in.Extensions, &out.Extensions
|
||||
|
44
vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go
generated
vendored
44
vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go
generated
vendored
@@ -46,31 +46,26 @@ func (in *AuthInfo) DeepCopyInto(out *AuthInfo) {
|
||||
in, out := &in.ImpersonateUserExtra, &out.ImpersonateUserExtra
|
||||
*out = make(map[string][]string, len(*in))
|
||||
for key, val := range *in {
|
||||
var outVal []string
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = make([]string, len(val))
|
||||
copy((*out)[key], val)
|
||||
in, out := &val, &outVal
|
||||
*out = make([]string, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
(*out)[key] = outVal
|
||||
}
|
||||
}
|
||||
if in.AuthProvider != nil {
|
||||
in, out := &in.AuthProvider, &out.AuthProvider
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(AuthProviderConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
*out = new(AuthProviderConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Exec != nil {
|
||||
in, out := &in.Exec, &out.Exec
|
||||
if *in == nil {
|
||||
*out = nil
|
||||
} else {
|
||||
*out = new(ExecConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
*out = new(ExecConfig)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Extensions != nil {
|
||||
in, out := &in.Extensions, &out.Extensions
|
||||
@@ -159,36 +154,45 @@ func (in *Config) DeepCopyInto(out *Config) {
|
||||
in, out := &in.Clusters, &out.Clusters
|
||||
*out = make(map[string]*Cluster, len(*in))
|
||||
for key, val := range *in {
|
||||
var outVal *Cluster
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(Cluster)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
in, out := &val, &outVal
|
||||
*out = new(Cluster)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
(*out)[key] = outVal
|
||||
}
|
||||
}
|
||||
if in.AuthInfos != nil {
|
||||
in, out := &in.AuthInfos, &out.AuthInfos
|
||||
*out = make(map[string]*AuthInfo, len(*in))
|
||||
for key, val := range *in {
|
||||
var outVal *AuthInfo
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(AuthInfo)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
in, out := &val, &outVal
|
||||
*out = new(AuthInfo)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
(*out)[key] = outVal
|
||||
}
|
||||
}
|
||||
if in.Contexts != nil {
|
||||
in, out := &in.Contexts, &out.Contexts
|
||||
*out = make(map[string]*Context, len(*in))
|
||||
for key, val := range *in {
|
||||
var outVal *Context
|
||||
if val == nil {
|
||||
(*out)[key] = nil
|
||||
} else {
|
||||
(*out)[key] = new(Context)
|
||||
val.DeepCopyInto((*out)[key])
|
||||
in, out := &val, &outVal
|
||||
*out = new(Context)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
(*out)[key] = outVal
|
||||
}
|
||||
}
|
||||
if in.Extensions != nil {
|
||||
|
28
vendor/k8s.io/client-go/tools/clientcmd/client_config.go
generated
vendored
28
vendor/k8s.io/client-go/tools/clientcmd/client_config.go
generated
vendored
@@ -175,10 +175,6 @@ func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
|
||||
// only try to read the auth information if we are secure
|
||||
if restclient.IsConfigTransportTLS(*clientConfig) {
|
||||
var err error
|
||||
|
||||
// mergo is a first write wins for map value and a last writing wins for interface values
|
||||
// NOTE: This behavior changed with https://github.com/imdario/mergo/commit/d304790b2ed594794496464fadd89d2bb266600a.
|
||||
// Our mergo.Merge version is older than this change.
|
||||
var persister restclient.AuthProviderConfigPersister
|
||||
if config.configAccess != nil {
|
||||
authInfoName, _ := config.getAuthInfoName()
|
||||
@@ -188,13 +184,13 @@ func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mergo.Merge(clientConfig, userAuthPartialConfig)
|
||||
mergo.MergeWithOverwrite(clientConfig, userAuthPartialConfig)
|
||||
|
||||
serverAuthPartialConfig, err := getServerIdentificationPartialConfig(configAuthInfo, configClusterInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mergo.Merge(clientConfig, serverAuthPartialConfig)
|
||||
mergo.MergeWithOverwrite(clientConfig, serverAuthPartialConfig)
|
||||
}
|
||||
|
||||
return clientConfig, nil
|
||||
@@ -214,7 +210,7 @@ func getServerIdentificationPartialConfig(configAuthInfo clientcmdapi.AuthInfo,
|
||||
configClientConfig.CAFile = configClusterInfo.CertificateAuthority
|
||||
configClientConfig.CAData = configClusterInfo.CertificateAuthorityData
|
||||
configClientConfig.Insecure = configClusterInfo.InsecureSkipTLSVerify
|
||||
mergo.Merge(mergedConfig, configClientConfig)
|
||||
mergo.MergeWithOverwrite(mergedConfig, configClientConfig)
|
||||
|
||||
return mergedConfig, nil
|
||||
}
|
||||
@@ -279,8 +275,8 @@ func (config *DirectClientConfig) getUserIdentificationPartialConfig(configAuthI
|
||||
promptedConfig := makeUserIdentificationConfig(*promptedAuthInfo)
|
||||
previouslyMergedConfig := mergedConfig
|
||||
mergedConfig = &restclient.Config{}
|
||||
mergo.Merge(mergedConfig, promptedConfig)
|
||||
mergo.Merge(mergedConfig, previouslyMergedConfig)
|
||||
mergo.MergeWithOverwrite(mergedConfig, promptedConfig)
|
||||
mergo.MergeWithOverwrite(mergedConfig, previouslyMergedConfig)
|
||||
config.promptedCredentials.username = mergedConfig.Username
|
||||
config.promptedCredentials.password = mergedConfig.Password
|
||||
}
|
||||
@@ -423,11 +419,11 @@ func (config *DirectClientConfig) getContext() (clientcmdapi.Context, error) {
|
||||
|
||||
mergedContext := clientcmdapi.NewContext()
|
||||
if configContext, exists := contexts[contextName]; exists {
|
||||
mergo.Merge(mergedContext, configContext)
|
||||
mergo.MergeWithOverwrite(mergedContext, configContext)
|
||||
} else if required {
|
||||
return clientcmdapi.Context{}, fmt.Errorf("context %q does not exist", contextName)
|
||||
}
|
||||
mergo.Merge(mergedContext, config.overrides.Context)
|
||||
mergo.MergeWithOverwrite(mergedContext, config.overrides.Context)
|
||||
|
||||
return *mergedContext, nil
|
||||
}
|
||||
@@ -439,11 +435,11 @@ func (config *DirectClientConfig) getAuthInfo() (clientcmdapi.AuthInfo, error) {
|
||||
|
||||
mergedAuthInfo := clientcmdapi.NewAuthInfo()
|
||||
if configAuthInfo, exists := authInfos[authInfoName]; exists {
|
||||
mergo.Merge(mergedAuthInfo, configAuthInfo)
|
||||
mergo.MergeWithOverwrite(mergedAuthInfo, configAuthInfo)
|
||||
} else if required {
|
||||
return clientcmdapi.AuthInfo{}, fmt.Errorf("auth info %q does not exist", authInfoName)
|
||||
}
|
||||
mergo.Merge(mergedAuthInfo, config.overrides.AuthInfo)
|
||||
mergo.MergeWithOverwrite(mergedAuthInfo, config.overrides.AuthInfo)
|
||||
|
||||
return *mergedAuthInfo, nil
|
||||
}
|
||||
@@ -454,13 +450,13 @@ func (config *DirectClientConfig) getCluster() (clientcmdapi.Cluster, error) {
|
||||
clusterInfoName, required := config.getClusterName()
|
||||
|
||||
mergedClusterInfo := clientcmdapi.NewCluster()
|
||||
mergo.Merge(mergedClusterInfo, config.overrides.ClusterDefaults)
|
||||
mergo.MergeWithOverwrite(mergedClusterInfo, config.overrides.ClusterDefaults)
|
||||
if configClusterInfo, exists := clusterInfos[clusterInfoName]; exists {
|
||||
mergo.Merge(mergedClusterInfo, configClusterInfo)
|
||||
mergo.MergeWithOverwrite(mergedClusterInfo, configClusterInfo)
|
||||
} else if required {
|
||||
return clientcmdapi.Cluster{}, fmt.Errorf("cluster %q does not exist", clusterInfoName)
|
||||
}
|
||||
mergo.Merge(mergedClusterInfo, config.overrides.ClusterInfo)
|
||||
mergo.MergeWithOverwrite(mergedClusterInfo, config.overrides.ClusterInfo)
|
||||
// An override of --insecure-skip-tls-verify=true and no accompanying CA/CA data should clear already-set CA/CA data
|
||||
// otherwise, a kubeconfig containing a CA reference would return an error that "CA and insecure-skip-tls-verify couldn't both be set"
|
||||
caLen := len(config.overrides.ClusterInfo.CertificateAuthority)
|
||||
|
181
vendor/k8s.io/client-go/tools/clientcmd/client_config_test.go
generated
vendored
181
vendor/k8s.io/client-go/tools/clientcmd/client_config_test.go
generated
vendored
@@ -28,21 +28,83 @@ import (
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
)
|
||||
|
||||
func TestOldMergoLib(t *testing.T) {
|
||||
type T struct {
|
||||
X string
|
||||
func TestMergoSemantics(t *testing.T) {
|
||||
type U struct {
|
||||
A string
|
||||
B int64
|
||||
}
|
||||
dst := T{X: "one"}
|
||||
src := T{X: "two"}
|
||||
mergo.Merge(&dst, &src)
|
||||
if dst.X != "two" {
|
||||
// mergo.Merge changed in an incompatible way with
|
||||
//
|
||||
// https://github.com/imdario/mergo/commit/d304790b2ed594794496464fadd89d2bb266600a
|
||||
//
|
||||
// We have to stay with the old version which still does eager
|
||||
// copying from src to dst in structs.
|
||||
t.Errorf("mergo.Merge library found with incompatible, new behavior")
|
||||
type T struct {
|
||||
S []string
|
||||
X string
|
||||
Y int64
|
||||
U U
|
||||
}
|
||||
var testDataStruct = []struct {
|
||||
dst T
|
||||
src T
|
||||
expected T
|
||||
}{
|
||||
{
|
||||
dst: T{X: "one"},
|
||||
src: T{X: "two"},
|
||||
expected: T{X: "two"},
|
||||
},
|
||||
{
|
||||
dst: T{X: "one", Y: 5, U: U{A: "four", B: 6}},
|
||||
src: T{X: "two", U: U{A: "three", B: 4}},
|
||||
expected: T{X: "two", Y: 5, U: U{A: "three", B: 4}},
|
||||
},
|
||||
{
|
||||
dst: T{S: []string{"test3", "test4", "test5"}},
|
||||
src: T{S: []string{"test1", "test2", "test3"}},
|
||||
expected: T{S: []string{"test1", "test2", "test3"}},
|
||||
},
|
||||
}
|
||||
for _, data := range testDataStruct {
|
||||
err := mergo.MergeWithOverwrite(&data.dst, &data.src)
|
||||
if err != nil {
|
||||
t.Errorf("error while merging: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(data.dst, data.expected) {
|
||||
// The mergo library has previously changed in a an incompatible way.
|
||||
// example:
|
||||
//
|
||||
// https://github.com/imdario/mergo/commit/d304790b2ed594794496464fadd89d2bb266600a
|
||||
//
|
||||
// This test verifies that the semantics of the merge are what we expect.
|
||||
// If they are not, the mergo library may have been updated and broken
|
||||
// unexpectedly.
|
||||
t.Errorf("mergo.MergeWithOverwrite did not provide expected output: %+v doesn't match %+v", data.dst, data.expected)
|
||||
}
|
||||
}
|
||||
|
||||
var testDataMap = []struct {
|
||||
dst map[string]int
|
||||
src map[string]int
|
||||
expected map[string]int
|
||||
}{
|
||||
{
|
||||
dst: map[string]int{"rsc": 6543, "r": 2138, "gri": 1908, "adg": 912, "prt": 22},
|
||||
src: map[string]int{"rsc": 3711, "r": 2138, "gri": 1908, "adg": 912},
|
||||
expected: map[string]int{"rsc": 3711, "r": 2138, "gri": 1908, "adg": 912, "prt": 22},
|
||||
},
|
||||
}
|
||||
for _, data := range testDataMap {
|
||||
err := mergo.MergeWithOverwrite(&data.dst, &data.src)
|
||||
if err != nil {
|
||||
t.Errorf("error while merging: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(data.dst, data.expected) {
|
||||
// The mergo library has previously changed in a an incompatible way.
|
||||
// example:
|
||||
//
|
||||
// https://github.com/imdario/mergo/commit/d304790b2ed594794496464fadd89d2bb266600a
|
||||
//
|
||||
// This test verifies that the semantics of the merge are what we expect.
|
||||
// If they are not, the mergo library may have been updated and broken
|
||||
// unexpectedly.
|
||||
t.Errorf("mergo.MergeWithOverwrite did not provide expected output: %+v doesn't match %+v", data.dst, data.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,6 +188,54 @@ func TestMergeContext(t *testing.T) {
|
||||
matchStringArg(namespace, actual, t)
|
||||
}
|
||||
|
||||
func TestModifyContext(t *testing.T) {
|
||||
expectedCtx := map[string]bool{
|
||||
"updated": true,
|
||||
"clean": true,
|
||||
}
|
||||
|
||||
tempPath, err := ioutil.TempFile("", "testclientcmd-")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
defer os.Remove(tempPath.Name())
|
||||
|
||||
pathOptions := NewDefaultPathOptions()
|
||||
config := createValidTestConfig()
|
||||
|
||||
pathOptions.GlobalFile = tempPath.Name()
|
||||
|
||||
// define new context and assign it - our path options config
|
||||
config.Contexts["updated"] = &clientcmdapi.Context{
|
||||
Cluster: "updated",
|
||||
AuthInfo: "updated",
|
||||
}
|
||||
config.CurrentContext = "updated"
|
||||
|
||||
if err := ModifyConfig(pathOptions, *config, true); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
startingConfig, err := pathOptions.GetStartingConfig()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// make sure the current context was updated
|
||||
matchStringArg("updated", startingConfig.CurrentContext, t)
|
||||
|
||||
// there should now be two contexts
|
||||
if len(startingConfig.Contexts) != len(expectedCtx) {
|
||||
t.Fatalf("unexpected nuber of contexts, expecting %v, but found %v", len(expectedCtx), len(startingConfig.Contexts))
|
||||
}
|
||||
|
||||
for key := range startingConfig.Contexts {
|
||||
if !expectedCtx[key] {
|
||||
t.Fatalf("expected context %q to exist", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCertificateData(t *testing.T) {
|
||||
caData := []byte("ca-data")
|
||||
certData := []byte("cert-data")
|
||||
@@ -526,3 +636,46 @@ func TestNamespaceOverride(t *testing.T) {
|
||||
|
||||
matchStringArg("foo", ns, t)
|
||||
}
|
||||
|
||||
func TestAuthConfigMerge(t *testing.T) {
|
||||
content := `
|
||||
apiVersion: v1
|
||||
clusters:
|
||||
- cluster:
|
||||
server: https://localhost:8080
|
||||
name: foo-cluster
|
||||
contexts:
|
||||
- context:
|
||||
cluster: foo-cluster
|
||||
user: foo-user
|
||||
namespace: bar
|
||||
name: foo-context
|
||||
current-context: foo-context
|
||||
kind: Config
|
||||
users:
|
||||
- name: foo-user
|
||||
user:
|
||||
exec:
|
||||
apiVersion: client.authentication.k8s.io/v1alpha1
|
||||
args:
|
||||
- arg-1
|
||||
- arg-2
|
||||
command: foo-command
|
||||
`
|
||||
tmpfile, err := ioutil.TempFile("", "kubeconfig")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer os.Remove(tmpfile.Name())
|
||||
if err := ioutil.WriteFile(tmpfile.Name(), []byte(content), 0666); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
config, err := BuildConfigFromFlags("", tmpfile.Name())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if !reflect.DeepEqual(config.ExecProvider.Args, []string{"arg-1", "arg-2"}) {
|
||||
t.Errorf("Got args %v when they should be %v\n", config.ExecProvider.Args, []string{"arg-1", "arg-2"})
|
||||
}
|
||||
|
||||
}
|
||||
|
30
vendor/k8s.io/client-go/tools/clientcmd/config.go
generated
vendored
30
vendor/k8s.io/client-go/tools/clientcmd/config.go
generated
vendored
@@ -220,6 +220,9 @@ func ModifyConfig(configAccess ConfigAccess, newConfig clientcmdapi.Config, rela
|
||||
}
|
||||
}
|
||||
|
||||
// seenConfigs stores a map of config source filenames to computed config objects
|
||||
seenConfigs := map[string]*clientcmdapi.Config{}
|
||||
|
||||
for key, context := range newConfig.Contexts {
|
||||
startingContext, exists := startingConfig.Contexts[key]
|
||||
if !reflect.DeepEqual(context, startingContext) || !exists {
|
||||
@@ -228,15 +231,28 @@ func ModifyConfig(configAccess ConfigAccess, newConfig clientcmdapi.Config, rela
|
||||
destinationFile = configAccess.GetDefaultFilename()
|
||||
}
|
||||
|
||||
configToWrite, err := getConfigFromFile(destinationFile)
|
||||
if err != nil {
|
||||
return err
|
||||
// we only obtain a fresh config object from its source file
|
||||
// if we have not seen it already - this prevents us from
|
||||
// reading and writing to the same number of files repeatedly
|
||||
// when multiple / all contexts share the same destination file.
|
||||
configToWrite, seen := seenConfigs[destinationFile]
|
||||
if !seen {
|
||||
var err error
|
||||
configToWrite, err = getConfigFromFile(destinationFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seenConfigs[destinationFile] = configToWrite
|
||||
}
|
||||
configToWrite.Contexts[key] = context
|
||||
|
||||
if err := WriteToFile(*configToWrite, destinationFile); err != nil {
|
||||
return err
|
||||
}
|
||||
configToWrite.Contexts[key] = context
|
||||
}
|
||||
}
|
||||
|
||||
// actually persist config object changes
|
||||
for destinationFile, configToWrite := range seenConfigs {
|
||||
if err := WriteToFile(*configToWrite, destinationFile); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
8
vendor/k8s.io/client-go/tools/clientcmd/loader.go
generated
vendored
8
vendor/k8s.io/client-go/tools/clientcmd/loader.go
generated
vendored
@@ -211,7 +211,7 @@ func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
|
||||
mapConfig := clientcmdapi.NewConfig()
|
||||
|
||||
for _, kubeconfig := range kubeconfigs {
|
||||
mergo.Merge(mapConfig, kubeconfig)
|
||||
mergo.MergeWithOverwrite(mapConfig, kubeconfig)
|
||||
}
|
||||
|
||||
// merge all of the struct values in the reverse order so that priority is given correctly
|
||||
@@ -219,14 +219,14 @@ func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
|
||||
nonMapConfig := clientcmdapi.NewConfig()
|
||||
for i := len(kubeconfigs) - 1; i >= 0; i-- {
|
||||
kubeconfig := kubeconfigs[i]
|
||||
mergo.Merge(nonMapConfig, kubeconfig)
|
||||
mergo.MergeWithOverwrite(nonMapConfig, kubeconfig)
|
||||
}
|
||||
|
||||
// since values are overwritten, but maps values are not, we can merge the non-map config on top of the map config and
|
||||
// get the values we expect.
|
||||
config := clientcmdapi.NewConfig()
|
||||
mergo.Merge(config, mapConfig)
|
||||
mergo.Merge(config, nonMapConfig)
|
||||
mergo.MergeWithOverwrite(config, mapConfig)
|
||||
mergo.MergeWithOverwrite(config, nonMapConfig)
|
||||
|
||||
if rules.ResolvePaths() {
|
||||
if err := ResolveLocalPaths(config); err != nil {
|
||||
|
86
vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go
generated
vendored
86
vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go
generated
vendored
@@ -49,6 +49,7 @@ limitations under the License.
|
||||
package leaderelection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
@@ -119,7 +120,7 @@ type LeaderElectionConfig struct {
|
||||
// * OnChallenge()
|
||||
type LeaderCallbacks struct {
|
||||
// OnStartedLeading is called when a LeaderElector client starts leading
|
||||
OnStartedLeading func(stop <-chan struct{})
|
||||
OnStartedLeading func(context.Context)
|
||||
// OnStoppedLeading is called when a LeaderElector client stops leading
|
||||
OnStoppedLeading func()
|
||||
// OnNewLeader is called when the client observes a leader that is
|
||||
@@ -129,10 +130,6 @@ type LeaderCallbacks struct {
|
||||
}
|
||||
|
||||
// LeaderElector is a leader election client.
|
||||
//
|
||||
// possible future methods:
|
||||
// * (le *LeaderElector) IsLeader()
|
||||
// * (le *LeaderElector) GetLeader()
|
||||
type LeaderElector struct {
|
||||
config LeaderElectionConfig
|
||||
// internal bookkeeping
|
||||
@@ -145,26 +142,28 @@ type LeaderElector struct {
|
||||
}
|
||||
|
||||
// Run starts the leader election loop
|
||||
func (le *LeaderElector) Run() {
|
||||
func (le *LeaderElector) Run(ctx context.Context) {
|
||||
defer func() {
|
||||
runtime.HandleCrash()
|
||||
le.config.Callbacks.OnStoppedLeading()
|
||||
}()
|
||||
le.acquire()
|
||||
stop := make(chan struct{})
|
||||
go le.config.Callbacks.OnStartedLeading(stop)
|
||||
le.renew()
|
||||
close(stop)
|
||||
if !le.acquire(ctx) {
|
||||
return // ctx signalled done
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go le.config.Callbacks.OnStartedLeading(ctx)
|
||||
le.renew(ctx)
|
||||
}
|
||||
|
||||
// RunOrDie starts a client with the provided config or panics if the config
|
||||
// fails to validate.
|
||||
func RunOrDie(lec LeaderElectionConfig) {
|
||||
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
|
||||
le, err := NewLeaderElector(lec)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
le.Run()
|
||||
le.Run(ctx)
|
||||
}
|
||||
|
||||
// GetLeader returns the identity of the last observed leader or returns the empty string if
|
||||
@@ -178,13 +177,16 @@ func (le *LeaderElector) IsLeader() bool {
|
||||
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
|
||||
}
|
||||
|
||||
// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds.
|
||||
func (le *LeaderElector) acquire() {
|
||||
stop := make(chan struct{})
|
||||
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
|
||||
// Returns false if ctx signals done.
|
||||
func (le *LeaderElector) acquire(ctx context.Context) bool {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
succeeded := false
|
||||
desc := le.config.Lock.Describe()
|
||||
glog.Infof("attempting to acquire leader lease %v...", desc)
|
||||
wait.JitterUntil(func() {
|
||||
succeeded := le.tryAcquireOrRenew()
|
||||
succeeded = le.tryAcquireOrRenew()
|
||||
le.maybeReportTransition()
|
||||
if !succeeded {
|
||||
glog.V(4).Infof("failed to acquire lease %v", desc)
|
||||
@@ -192,17 +194,33 @@ func (le *LeaderElector) acquire() {
|
||||
}
|
||||
le.config.Lock.RecordEvent("became leader")
|
||||
glog.Infof("successfully acquired lease %v", desc)
|
||||
close(stop)
|
||||
}, le.config.RetryPeriod, JitterFactor, true, stop)
|
||||
cancel()
|
||||
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
|
||||
return succeeded
|
||||
}
|
||||
|
||||
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails.
|
||||
func (le *LeaderElector) renew() {
|
||||
stop := make(chan struct{})
|
||||
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
|
||||
func (le *LeaderElector) renew(ctx context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
wait.Until(func() {
|
||||
err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) {
|
||||
return le.tryAcquireOrRenew(), nil
|
||||
})
|
||||
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
|
||||
defer timeoutCancel()
|
||||
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
|
||||
done := make(chan bool, 1)
|
||||
go func() {
|
||||
defer close(done)
|
||||
done <- le.tryAcquireOrRenew()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-timeoutCtx.Done():
|
||||
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
|
||||
case result := <-done:
|
||||
return result, nil
|
||||
}
|
||||
}, timeoutCtx.Done())
|
||||
|
||||
le.maybeReportTransition()
|
||||
desc := le.config.Lock.Describe()
|
||||
if err == nil {
|
||||
@@ -211,8 +229,8 @@ func (le *LeaderElector) renew() {
|
||||
}
|
||||
le.config.Lock.RecordEvent("stopped leading")
|
||||
glog.Infof("failed to renew lease %v: %v", desc, err)
|
||||
close(stop)
|
||||
}, 0, stop)
|
||||
cancel()
|
||||
}, le.config.RetryPeriod, ctx.Done())
|
||||
}
|
||||
|
||||
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
|
||||
@@ -249,14 +267,14 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||
le.observedTime = time.Now()
|
||||
}
|
||||
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
|
||||
oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() {
|
||||
!le.IsLeader() {
|
||||
glog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
|
||||
return false
|
||||
}
|
||||
|
||||
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
|
||||
// here. Let's correct it before updating.
|
||||
if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() {
|
||||
if le.IsLeader() {
|
||||
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
|
||||
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
|
||||
} else {
|
||||
@@ -273,12 +291,12 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (l *LeaderElector) maybeReportTransition() {
|
||||
if l.observedRecord.HolderIdentity == l.reportedLeader {
|
||||
func (le *LeaderElector) maybeReportTransition() {
|
||||
if le.observedRecord.HolderIdentity == le.reportedLeader {
|
||||
return
|
||||
}
|
||||
l.reportedLeader = l.observedRecord.HolderIdentity
|
||||
if l.config.Callbacks.OnNewLeader != nil {
|
||||
go l.config.Callbacks.OnNewLeader(l.reportedLeader)
|
||||
le.reportedLeader = le.observedRecord.HolderIdentity
|
||||
if le.config.Callbacks.OnNewLeader != nil {
|
||||
go le.config.Callbacks.OnNewLeader(le.reportedLeader)
|
||||
}
|
||||
}
|
||||
|
5
vendor/k8s.io/client-go/tools/record/events_cache.go
generated
vendored
5
vendor/k8s.io/client-go/tools/record/events_cache.go
generated
vendored
@@ -84,11 +84,6 @@ func getSpamKey(event *v1.Event) string {
|
||||
// EventFilterFunc is a function that returns true if the event should be skipped
|
||||
type EventFilterFunc func(event *v1.Event) bool
|
||||
|
||||
// DefaultEventFilterFunc returns false for all incoming events
|
||||
func DefaultEventFilterFunc(event *v1.Event) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// EventSourceObjectSpamFilter is responsible for throttling
|
||||
// the amount of events a source and object can produce.
|
||||
type EventSourceObjectSpamFilter struct {
|
||||
|
8
vendor/k8s.io/client-go/tools/record/events_cache_test.go
generated
vendored
8
vendor/k8s.io/client-go/tools/record/events_cache_test.go
generated
vendored
@@ -127,14 +127,6 @@ func validateEvent(messagePrefix string, actualEvent *v1.Event, expectedEvent *v
|
||||
return actualEvent, nil
|
||||
}
|
||||
|
||||
// TestDefaultEventFilterFunc ensures that no events are filtered
|
||||
func TestDefaultEventFilterFunc(t *testing.T) {
|
||||
event := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other"))
|
||||
if DefaultEventFilterFunc(&event) {
|
||||
t.Fatalf("DefaultEventFilterFunc should always return false")
|
||||
}
|
||||
}
|
||||
|
||||
// TestEventAggregatorByReasonFunc ensures that two events are aggregated if they vary only by event.message
|
||||
func TestEventAggregatorByReasonFunc(t *testing.T) {
|
||||
event1 := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other"))
|
||||
|
4
vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
generated
vendored
4
vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
generated
vendored
@@ -30,8 +30,8 @@ import (
|
||||
spdy "k8s.io/client-go/transport/spdy"
|
||||
)
|
||||
|
||||
// StreamOptions holds information pertaining to the current streaming session: supported stream
|
||||
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
|
||||
// StreamOptions holds information pertaining to the current streaming session:
|
||||
// input/output streams, if the client is requesting a TTY, and a terminal size queue to
|
||||
// support terminal resizing.
|
||||
type StreamOptions struct {
|
||||
Stdin io.Reader
|
||||
|
114
vendor/k8s.io/client-go/tools/watch/informerwatcher.go
generated
vendored
Normal file
114
vendor/k8s.io/client-go/tools/watch/informerwatcher.go
generated
vendored
Normal file
@@ -0,0 +1,114 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func newTicketer() *ticketer {
|
||||
return &ticketer{
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
}
|
||||
}
|
||||
|
||||
type ticketer struct {
|
||||
counter uint64
|
||||
|
||||
cond *sync.Cond
|
||||
current uint64
|
||||
}
|
||||
|
||||
func (t *ticketer) GetTicket() uint64 {
|
||||
// -1 to start from 0
|
||||
return atomic.AddUint64(&t.counter, 1) - 1
|
||||
}
|
||||
|
||||
func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
|
||||
t.cond.L.Lock()
|
||||
defer t.cond.L.Unlock()
|
||||
for ticket != t.current {
|
||||
t.cond.Wait()
|
||||
}
|
||||
|
||||
f()
|
||||
|
||||
t.current++
|
||||
t.cond.Broadcast()
|
||||
}
|
||||
|
||||
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
|
||||
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
|
||||
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) {
|
||||
ch := make(chan watch.Event)
|
||||
w := watch.NewProxyWatcher(ch)
|
||||
t := newTicketer()
|
||||
|
||||
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
go t.WaitForTicket(t.GetTicket(), func() {
|
||||
select {
|
||||
case ch <- watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: obj.(runtime.Object),
|
||||
}:
|
||||
case <-w.StopChan():
|
||||
}
|
||||
})
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
go t.WaitForTicket(t.GetTicket(), func() {
|
||||
select {
|
||||
case ch <- watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: new.(runtime.Object),
|
||||
}:
|
||||
case <-w.StopChan():
|
||||
}
|
||||
})
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
go t.WaitForTicket(t.GetTicket(), func() {
|
||||
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
|
||||
if stale {
|
||||
// We have no means of passing the additional information down using watch API based on watch.Event
|
||||
// but the caller can filter such objects by checking if metadata.deletionTimestamp is set
|
||||
obj = staleObj
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- watch.Event{
|
||||
Type: watch.Deleted,
|
||||
Object: obj.(runtime.Object),
|
||||
}:
|
||||
case <-w.StopChan():
|
||||
}
|
||||
})
|
||||
},
|
||||
}, cache.Indexers{})
|
||||
|
||||
go func() {
|
||||
informer.Run(w.StopChan())
|
||||
}()
|
||||
|
||||
return indexer, informer, w
|
||||
}
|
236
vendor/k8s.io/client-go/tools/watch/informerwatcher_test.go
generated
vendored
Normal file
236
vendor/k8s.io/client-go/tools/watch/informerwatcher_test.go
generated
vendored
Normal file
@@ -0,0 +1,236 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
testcore "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type byEventTypeAndName []watch.Event
|
||||
|
||||
func (a byEventTypeAndName) Len() int { return len(a) }
|
||||
func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a byEventTypeAndName) Less(i, j int) bool {
|
||||
if a[i].Type < a[j].Type {
|
||||
return true
|
||||
}
|
||||
|
||||
if a[i].Type > a[j].Type {
|
||||
return false
|
||||
}
|
||||
|
||||
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
|
||||
}
|
||||
|
||||
func TestTicketer(t *testing.T) {
|
||||
tg := newTicketer()
|
||||
|
||||
const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines
|
||||
var tickets []uint64
|
||||
for i := 0; i < numTickets; i++ {
|
||||
ticket := tg.GetTicket()
|
||||
tickets = append(tickets, ticket)
|
||||
|
||||
exp, got := uint64(i), ticket
|
||||
if got != exp {
|
||||
t.Fatalf("expected ticket %d, got %d", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// shuffle tickets
|
||||
rand.Shuffle(len(tickets), func(i, j int) {
|
||||
tickets[i], tickets[j] = tickets[j], tickets[i]
|
||||
})
|
||||
|
||||
res := make(chan uint64, len(tickets))
|
||||
for _, ticket := range tickets {
|
||||
go func(ticket uint64) {
|
||||
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
|
||||
tg.WaitForTicket(ticket, func() {
|
||||
res <- ticket
|
||||
})
|
||||
}(ticket)
|
||||
}
|
||||
|
||||
for i := 0; i < numTickets; i++ {
|
||||
exp, got := uint64(i), <-res
|
||||
if got != exp {
|
||||
t.Fatalf("expected ticket %d, got %d", exp, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewInformerWatcher(t *testing.T) {
|
||||
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
|
||||
tt := []struct {
|
||||
name string
|
||||
objects []runtime.Object
|
||||
events []watch.Event
|
||||
}{
|
||||
{
|
||||
name: "basic test",
|
||||
objects: []runtime.Object{
|
||||
&corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
},
|
||||
StringData: map[string]string{
|
||||
"foo-1": "initial",
|
||||
},
|
||||
},
|
||||
&corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-2",
|
||||
},
|
||||
StringData: map[string]string{
|
||||
"foo-2": "initial",
|
||||
},
|
||||
},
|
||||
&corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-3",
|
||||
},
|
||||
StringData: map[string]string{
|
||||
"foo-3": "initial",
|
||||
},
|
||||
},
|
||||
},
|
||||
events: []watch.Event{
|
||||
{
|
||||
Type: watch.Added,
|
||||
Object: &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-4",
|
||||
},
|
||||
StringData: map[string]string{
|
||||
"foo-4": "initial",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: watch.Modified,
|
||||
Object: &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-2",
|
||||
},
|
||||
StringData: map[string]string{
|
||||
"foo-2": "new",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: watch.Deleted,
|
||||
Object: &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-3",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var expected []watch.Event
|
||||
for _, o := range tc.objects {
|
||||
expected = append(expected, watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: o.DeepCopyObject(),
|
||||
})
|
||||
}
|
||||
for _, e := range tc.events {
|
||||
expected = append(expected, *e.DeepCopy())
|
||||
}
|
||||
|
||||
fake := fakeclientset.NewSimpleClientset(tc.objects...)
|
||||
fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
|
||||
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
|
||||
|
||||
for _, e := range tc.events {
|
||||
fakeWatch.Action(e.Type, e.Object)
|
||||
}
|
||||
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return fake.Core().Secrets("").List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return fake.Core().Secrets("").Watch(options)
|
||||
},
|
||||
}
|
||||
_, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{})
|
||||
|
||||
var result []watch.Event
|
||||
loop:
|
||||
for {
|
||||
var event watch.Event
|
||||
var ok bool
|
||||
select {
|
||||
case event, ok = <-w.ResultChan():
|
||||
if !ok {
|
||||
t.Errorf("Failed to read event: channel is already closed!")
|
||||
return
|
||||
}
|
||||
|
||||
result = append(result, *event.DeepCopy())
|
||||
case <-time.After(time.Second * 1):
|
||||
// All the events are buffered -> this means we are done
|
||||
// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
// Informers don't guarantee event order so we need to sort these arrays to compare them
|
||||
sort.Sort(byEventTypeAndName(expected))
|
||||
sort.Sort(byEventTypeAndName(result))
|
||||
|
||||
if !reflect.DeepEqual(expected, result) {
|
||||
t.Error(spew.Errorf("\nexpected: %#v,\ngot: %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result)))
|
||||
return
|
||||
}
|
||||
|
||||
// Fill in some data to test watch closing while there are some events to be read
|
||||
for _, e := range tc.events {
|
||||
fakeWatch.Action(e.Type, e.Object)
|
||||
}
|
||||
|
||||
// Stop before reading all the data to make sure the informer can deal with closed channel
|
||||
w.Stop()
|
||||
|
||||
// Wait a bit to see if the informer won't panic
|
||||
// TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591)
|
||||
time.Sleep(1 * time.Second)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
225
vendor/k8s.io/client-go/tools/watch/until.go
generated
vendored
Normal file
225
vendor/k8s.io/client-go/tools/watch/until.go
generated
vendored
Normal file
@@ -0,0 +1,225 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet,
|
||||
// or an error if the condition failed or detected an error state.
|
||||
type PreconditionFunc func(store cache.Store) (bool, error)
|
||||
|
||||
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
|
||||
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
|
||||
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
|
||||
// from false to true).
|
||||
type ConditionFunc func(event watch.Event) (bool, error)
|
||||
|
||||
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
|
||||
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
|
||||
|
||||
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
|
||||
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
||||
// If no event has been received, the returned event will be nil.
|
||||
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
||||
// Waits until context deadline or until context is canceled.
|
||||
//
|
||||
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
|
||||
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
|
||||
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
|
||||
// Warning: solving such issues.
|
||||
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
|
||||
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||
ch := watcher.ResultChan()
|
||||
defer watcher.Stop()
|
||||
var lastEvent *watch.Event
|
||||
for _, condition := range conditions {
|
||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||
if lastEvent != nil {
|
||||
done, err := condition(*lastEvent)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
}
|
||||
if done {
|
||||
continue
|
||||
}
|
||||
}
|
||||
ConditionSucceeded:
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-ch:
|
||||
if !ok {
|
||||
return lastEvent, ErrWatchClosed
|
||||
}
|
||||
lastEvent = &event
|
||||
|
||||
done, err := condition(event)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
}
|
||||
if done {
|
||||
break ConditionSucceeded
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return lastEvent, wait.ErrWaitTimeout
|
||||
}
|
||||
}
|
||||
}
|
||||
return lastEvent, nil
|
||||
}
|
||||
|
||||
// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced,
|
||||
// and watches the output until each provided condition succeeds, in a way that is identical
|
||||
// to function UntilWithoutRetry. (See above.)
|
||||
// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'.
|
||||
// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will
|
||||
// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple
|
||||
// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will
|
||||
// re-list to recover and you always get an event, if there has been a change, after recovery.
|
||||
// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for
|
||||
// particular object, not between more of them even it's the same resource.
|
||||
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
|
||||
// waiting for object reaching a state, "small" controllers, ...
|
||||
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||
indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType)
|
||||
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and
|
||||
// let UntilWithoutRetry to stop it
|
||||
defer watcher.Stop()
|
||||
|
||||
if precondition != nil {
|
||||
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
|
||||
return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err())
|
||||
}
|
||||
|
||||
done, err := precondition(indexer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if done {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
return UntilWithoutRetry(ctx, watcher, conditions...)
|
||||
}
|
||||
|
||||
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
|
||||
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
|
||||
if timeout < 0 {
|
||||
// This should be handled in validation
|
||||
glog.Errorf("Timeout for context shall not be negative!")
|
||||
timeout = 0
|
||||
}
|
||||
|
||||
if timeout == 0 {
|
||||
return context.WithCancel(parent)
|
||||
}
|
||||
|
||||
return context.WithTimeout(parent, timeout)
|
||||
}
|
||||
|
||||
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
||||
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
||||
// TODO: remove when no longer used
|
||||
//
|
||||
// Deprecated: Use UntilWithSync instead.
|
||||
func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||
if len(conditions) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
list, err := lw.List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
initialItems, err := meta.ExtractList(list)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// use the initial items as simulated "adds"
|
||||
var lastEvent *watch.Event
|
||||
currIndex := 0
|
||||
passedConditions := 0
|
||||
for _, condition := range conditions {
|
||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||
if lastEvent != nil {
|
||||
done, err := condition(*lastEvent)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
}
|
||||
if done {
|
||||
passedConditions = passedConditions + 1
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
ConditionSucceeded:
|
||||
for currIndex < len(initialItems) {
|
||||
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
|
||||
currIndex++
|
||||
|
||||
done, err := condition(*lastEvent)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
}
|
||||
if done {
|
||||
passedConditions = passedConditions + 1
|
||||
break ConditionSucceeded
|
||||
}
|
||||
}
|
||||
}
|
||||
if passedConditions == len(conditions) {
|
||||
return lastEvent, nil
|
||||
}
|
||||
remainingConditions := conditions[passedConditions:]
|
||||
|
||||
metaObj, err := meta.ListAccessor(list)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currResourceVersion := metaObj.GetResourceVersion()
|
||||
|
||||
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
|
||||
if err == ErrWatchClosed {
|
||||
// present a consistent error interface to callers
|
||||
err = wait.ErrWaitTimeout
|
||||
}
|
||||
return evt, err
|
||||
}
|
303
vendor/k8s.io/client-go/tools/watch/until_test.go
generated
vendored
Normal file
303
vendor/k8s.io/client-go/tools/watch/until_test.go
generated
vendored
Normal file
@@ -0,0 +1,303 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type fakePod struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
|
||||
func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") }
|
||||
|
||||
func TestUntil(t *testing.T) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
fw.Modify(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil },
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
|
||||
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != watch.Modified {
|
||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
t.Fatalf("expected a pod event, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUntilMultipleConditions(t *testing.T) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
|
||||
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != watch.Added {
|
||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
t.Fatalf("expected a pod event, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUntilMultipleConditionsFail(t *testing.T) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil },
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err != wait.ErrWaitTimeout {
|
||||
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != watch.Added {
|
||||
t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
t.Fatalf("expected a pod event, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUntilTimeout(t *testing.T) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
fw.Modify(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event watch.Event) (bool, error) {
|
||||
return event.Type == watch.Added, nil
|
||||
},
|
||||
func(event watch.Event) (bool, error) {
|
||||
return event.Type == watch.Modified, nil
|
||||
},
|
||||
}
|
||||
|
||||
lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != watch.Modified {
|
||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
t.Fatalf("expected a pod event, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUntilErrorCondition(t *testing.T) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
}()
|
||||
expected := "something bad"
|
||||
conditions := []ConditionFunc{
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return false, errors.New(expected) },
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
_, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err == nil {
|
||||
t.Fatal("expected an error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), expected) {
|
||||
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestUntilWithSync(t *testing.T) {
|
||||
// FIXME: test preconditions
|
||||
tt := []struct {
|
||||
name string
|
||||
lw *cache.ListWatch
|
||||
preconditionFunc PreconditionFunc
|
||||
conditionFunc ConditionFunc
|
||||
expectedErr error
|
||||
expectedEvent *watch.Event
|
||||
}{
|
||||
{
|
||||
name: "doesn't wait for sync with no precondition",
|
||||
lw: &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
select {}
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
select {}
|
||||
},
|
||||
},
|
||||
preconditionFunc: nil,
|
||||
conditionFunc: func(e watch.Event) (bool, error) {
|
||||
return true, nil
|
||||
},
|
||||
expectedErr: errors.New("timed out waiting for the condition"),
|
||||
expectedEvent: nil,
|
||||
},
|
||||
{
|
||||
name: "waits indefinitely with precondition if it can't sync",
|
||||
lw: &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
select {}
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
select {}
|
||||
},
|
||||
},
|
||||
preconditionFunc: func(store cache.Store) (bool, error) {
|
||||
return true, nil
|
||||
},
|
||||
conditionFunc: func(e watch.Event) (bool, error) {
|
||||
return true, nil
|
||||
},
|
||||
expectedErr: errors.New("UntilWithSync: unable to sync caches: context deadline exceeded"),
|
||||
expectedEvent: nil,
|
||||
},
|
||||
{
|
||||
name: "precondition can stop the loop",
|
||||
lw: func() *cache.ListWatch {
|
||||
fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
|
||||
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return fakeclient.CoreV1().Secrets("").List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return fakeclient.CoreV1().Secrets("").Watch(options)
|
||||
},
|
||||
}
|
||||
}(),
|
||||
preconditionFunc: func(store cache.Store) (bool, error) {
|
||||
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if exists {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
},
|
||||
conditionFunc: func(e watch.Event) (bool, error) {
|
||||
return true, errors.New("should never reach this")
|
||||
},
|
||||
expectedErr: nil,
|
||||
expectedEvent: nil,
|
||||
},
|
||||
{
|
||||
name: "precondition lets it proceed to regular condition",
|
||||
lw: func() *cache.ListWatch {
|
||||
fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
|
||||
|
||||
return &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return fakeclient.CoreV1().Secrets("").List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return fakeclient.CoreV1().Secrets("").Watch(options)
|
||||
},
|
||||
}
|
||||
}(),
|
||||
preconditionFunc: func(store cache.Store) (bool, error) {
|
||||
return false, nil
|
||||
},
|
||||
conditionFunc: func(e watch.Event) (bool, error) {
|
||||
if e.Type == watch.Added {
|
||||
return true, nil
|
||||
}
|
||||
panic("no other events are expected")
|
||||
},
|
||||
expectedErr: nil,
|
||||
expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Informer waits for caches to sync by polling in 100ms intervals,
|
||||
// timeout needs to be reasonably higher
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc)
|
||||
|
||||
if !reflect.DeepEqual(err, tc.expectedErr) {
|
||||
t.Errorf("expected error %#v, got %#v", tc.expectedErr, err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(event, tc.expectedEvent) {
|
||||
t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user