Update k8s.io dependencies to master to get selflink fix in client-go
This commit is contained in:
3
vendor/k8s.io/client-go/tools/auth/clientauth.go
generated
vendored
3
vendor/k8s.io/client-go/tools/auth/clientauth.go
generated
vendored
@@ -105,7 +105,7 @@ func LoadFromFile(path string) (*Info, error) {
|
||||
// The fields of client.Config with a corresponding field in the Info are set
|
||||
// with the value from the Info.
|
||||
func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error) {
|
||||
var config restclient.Config = c
|
||||
var config = c
|
||||
config.Username = info.User
|
||||
config.Password = info.Password
|
||||
config.CAFile = info.CAFile
|
||||
@@ -118,6 +118,7 @@ func (info Info) MergeWithConfig(c restclient.Config) (restclient.Config, error)
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// Complete returns true if the Kubernetes API authorization info is complete.
|
||||
func (info Info) Complete() bool {
|
||||
return len(info.User) > 0 ||
|
||||
len(info.CertFile) > 0 ||
|
||||
|
3
vendor/k8s.io/client-go/tools/cache/controller.go
generated
vendored
3
vendor/k8s.io/client-go/tools/cache/controller.go
generated
vendored
@@ -79,6 +79,7 @@ type controller struct {
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// Controller is a generic controller framework.
|
||||
type Controller interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
HasSynced() bool
|
||||
@@ -149,7 +150,7 @@ func (c *controller) processLoop() {
|
||||
for {
|
||||
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
|
||||
if err != nil {
|
||||
if err == FIFOClosedError {
|
||||
if err == ErrFIFOClosed {
|
||||
return
|
||||
}
|
||||
if c.config.RetryOnError {
|
||||
|
7
vendor/k8s.io/client-go/tools/cache/delta_fifo.go
generated
vendored
7
vendor/k8s.io/client-go/tools/cache/delta_fifo.go
generated
vendored
@@ -160,7 +160,7 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
|
||||
return f.keyFunc(obj)
|
||||
}
|
||||
|
||||
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
||||
func (f *DeltaFIFO) HasSynced() bool {
|
||||
f.lock.Lock()
|
||||
@@ -389,7 +389,7 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
|
||||
return d, exists, nil
|
||||
}
|
||||
|
||||
// Checks if the queue is closed
|
||||
// IsClosed checks if the queue is closed
|
||||
func (f *DeltaFIFO) IsClosed() bool {
|
||||
f.closedLock.Lock()
|
||||
defer f.closedLock.Unlock()
|
||||
@@ -417,7 +417,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
||||
// When Close() is called, the f.closed is set and the condition is broadcasted.
|
||||
// Which causes this loop to continue and return from the Pop().
|
||||
if f.IsClosed() {
|
||||
return nil, FIFOClosedError
|
||||
return nil, ErrFIFOClosed
|
||||
}
|
||||
|
||||
f.cond.Wait()
|
||||
@@ -593,6 +593,7 @@ type KeyGetter interface {
|
||||
// DeltaType is the type of a change (addition, deletion, etc)
|
||||
type DeltaType string
|
||||
|
||||
// Change type definition
|
||||
const (
|
||||
Added DeltaType = "Added"
|
||||
Updated DeltaType = "Updated"
|
||||
|
49
vendor/k8s.io/client-go/tools/cache/expiration_cache.go
generated
vendored
49
vendor/k8s.io/client-go/tools/cache/expiration_cache.go
generated
vendored
@@ -48,14 +48,14 @@ type ExpirationCache struct {
|
||||
// ExpirationPolicy dictates when an object expires. Currently only abstracted out
|
||||
// so unittests don't rely on the system clock.
|
||||
type ExpirationPolicy interface {
|
||||
IsExpired(obj *timestampedEntry) bool
|
||||
IsExpired(obj *TimestampedEntry) bool
|
||||
}
|
||||
|
||||
// TTLPolicy implements a ttl based ExpirationPolicy.
|
||||
type TTLPolicy struct {
|
||||
// >0: Expire entries with an age > ttl
|
||||
// <=0: Don't expire any entry
|
||||
Ttl time.Duration
|
||||
TTL time.Duration
|
||||
|
||||
// Clock used to calculate ttl expiration
|
||||
Clock clock.Clock
|
||||
@@ -63,26 +63,30 @@ type TTLPolicy struct {
|
||||
|
||||
// IsExpired returns true if the given object is older than the ttl, or it can't
|
||||
// determine its age.
|
||||
func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
|
||||
return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
|
||||
func (p *TTLPolicy) IsExpired(obj *TimestampedEntry) bool {
|
||||
return p.TTL > 0 && p.Clock.Since(obj.Timestamp) > p.TTL
|
||||
}
|
||||
|
||||
// timestampedEntry is the only type allowed in a ExpirationCache.
|
||||
type timestampedEntry struct {
|
||||
obj interface{}
|
||||
timestamp time.Time
|
||||
// TimestampedEntry is the only type allowed in a ExpirationCache.
|
||||
// Keep in mind that it is not safe to share timestamps between computers.
|
||||
// Behavior may be inconsistent if you get a timestamp from the API Server and
|
||||
// use it on the client machine as part of your ExpirationCache.
|
||||
type TimestampedEntry struct {
|
||||
Obj interface{}
|
||||
Timestamp time.Time
|
||||
key string
|
||||
}
|
||||
|
||||
// getTimestampedEntry returns the timestampedEntry stored under the given key.
|
||||
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
|
||||
// getTimestampedEntry returns the TimestampedEntry stored under the given key.
|
||||
func (c *ExpirationCache) getTimestampedEntry(key string) (*TimestampedEntry, bool) {
|
||||
item, _ := c.cacheStorage.Get(key)
|
||||
if tsEntry, ok := item.(*timestampedEntry); ok {
|
||||
if tsEntry, ok := item.(*TimestampedEntry); ok {
|
||||
return tsEntry, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
|
||||
// getOrExpire retrieves the object from the TimestampedEntry if and only if it hasn't
|
||||
// already expired. It holds a write lock across deletion.
|
||||
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
|
||||
// Prevent all inserts from the time we deem an item as "expired" to when we
|
||||
@@ -95,11 +99,11 @@ func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
|
||||
return nil, false
|
||||
}
|
||||
if c.expirationPolicy.IsExpired(timestampedItem) {
|
||||
klog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
|
||||
klog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.Obj)
|
||||
c.cacheStorage.Delete(key)
|
||||
return nil, false
|
||||
}
|
||||
return timestampedItem.obj, true
|
||||
return timestampedItem.Obj, true
|
||||
}
|
||||
|
||||
// GetByKey returns the item stored under the key, or sets exists=false.
|
||||
@@ -126,10 +130,8 @@ func (c *ExpirationCache) List() []interface{} {
|
||||
|
||||
list := make([]interface{}, 0, len(items))
|
||||
for _, item := range items {
|
||||
obj := item.(*timestampedEntry).obj
|
||||
if key, err := c.keyFunc(obj); err != nil {
|
||||
list = append(list, obj)
|
||||
} else if obj, exists := c.getOrExpire(key); exists {
|
||||
key := item.(*TimestampedEntry).key
|
||||
if obj, exists := c.getOrExpire(key); exists {
|
||||
list = append(list, obj)
|
||||
}
|
||||
}
|
||||
@@ -151,7 +153,7 @@ func (c *ExpirationCache) Add(obj interface{}) error {
|
||||
c.expirationLock.Lock()
|
||||
defer c.expirationLock.Unlock()
|
||||
|
||||
c.cacheStorage.Add(key, ×tampedEntry{obj, c.clock.Now()})
|
||||
c.cacheStorage.Add(key, &TimestampedEntry{obj, c.clock.Now(), key})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -184,7 +186,7 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
|
||||
if err != nil {
|
||||
return KeyError{item, err}
|
||||
}
|
||||
items[key] = ×tampedEntry{item, ts}
|
||||
items[key] = &TimestampedEntry{item, ts, key}
|
||||
}
|
||||
c.expirationLock.Lock()
|
||||
defer c.expirationLock.Unlock()
|
||||
@@ -199,10 +201,15 @@ func (c *ExpirationCache) Resync() error {
|
||||
|
||||
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
|
||||
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
|
||||
return NewExpirationStore(keyFunc, &TTLPolicy{ttl, clock.RealClock{}})
|
||||
}
|
||||
|
||||
// NewExpirationStore creates and returns a ExpirationCache for a given policy
|
||||
func NewExpirationStore(keyFunc KeyFunc, expirationPolicy ExpirationPolicy) Store {
|
||||
return &ExpirationCache{
|
||||
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
|
||||
keyFunc: keyFunc,
|
||||
clock: clock.RealClock{},
|
||||
expirationPolicy: &TTLPolicy{ttl, clock.RealClock{}},
|
||||
expirationPolicy: expirationPolicy,
|
||||
}
|
||||
}
|
||||
|
5
vendor/k8s.io/client-go/tools/cache/expiration_cache_fakes.go
generated
vendored
5
vendor/k8s.io/client-go/tools/cache/expiration_cache_fakes.go
generated
vendored
@@ -33,16 +33,19 @@ func (c *fakeThreadSafeMap) Delete(key string) {
|
||||
}
|
||||
}
|
||||
|
||||
// FakeExpirationPolicy keeps the list for keys which never expires.
|
||||
type FakeExpirationPolicy struct {
|
||||
NeverExpire sets.String
|
||||
RetrieveKeyFunc KeyFunc
|
||||
}
|
||||
|
||||
func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool {
|
||||
// IsExpired used to check if object is expired.
|
||||
func (p *FakeExpirationPolicy) IsExpired(obj *TimestampedEntry) bool {
|
||||
key, _ := p.RetrieveKeyFunc(obj)
|
||||
return !p.NeverExpire.Has(key)
|
||||
}
|
||||
|
||||
// NewFakeExpirationStore creates a new instance for the ExpirationCache.
|
||||
func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock clock.Clock) Store {
|
||||
cacheStorage := NewThreadSafeStore(Indexers{}, Indices{})
|
||||
return &ExpirationCache{
|
||||
|
4
vendor/k8s.io/client-go/tools/cache/fake_custom_store.go
generated
vendored
4
vendor/k8s.io/client-go/tools/cache/fake_custom_store.go
generated
vendored
@@ -16,7 +16,7 @@ limitations under the License.
|
||||
|
||||
package cache
|
||||
|
||||
// FakeStore lets you define custom functions for store operations
|
||||
// FakeCustomStore lets you define custom functions for store operations.
|
||||
type FakeCustomStore struct {
|
||||
AddFunc func(obj interface{}) error
|
||||
UpdateFunc func(obj interface{}) error
|
||||
@@ -25,7 +25,7 @@ type FakeCustomStore struct {
|
||||
ListKeysFunc func() []string
|
||||
GetFunc func(obj interface{}) (item interface{}, exists bool, err error)
|
||||
GetByKeyFunc func(key string) (item interface{}, exists bool, err error)
|
||||
ReplaceFunc func(list []interface{}, resourceVerion string) error
|
||||
ReplaceFunc func(list []interface{}, resourceVersion string) error
|
||||
ResyncFunc func() error
|
||||
}
|
||||
|
||||
|
11
vendor/k8s.io/client-go/tools/cache/fifo.go
generated
vendored
11
vendor/k8s.io/client-go/tools/cache/fifo.go
generated
vendored
@@ -34,7 +34,8 @@ type ErrRequeue struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
|
||||
// ErrFIFOClosed used when FIFO is closed
|
||||
var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
|
||||
|
||||
func (e ErrRequeue) Error() string {
|
||||
if e.Err == nil {
|
||||
@@ -66,7 +67,7 @@ type Queue interface {
|
||||
Close()
|
||||
}
|
||||
|
||||
// Helper function for popping from Queue.
|
||||
// Pop is helper function for popping from Queue.
|
||||
// WARNING: Do NOT use this function in non-test code to avoid races
|
||||
// unless you really really really really know what you are doing.
|
||||
func Pop(queue Queue) interface{} {
|
||||
@@ -126,7 +127,7 @@ func (f *FIFO) Close() {
|
||||
f.cond.Broadcast()
|
||||
}
|
||||
|
||||
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
||||
func (f *FIFO) HasSynced() bool {
|
||||
f.lock.Lock()
|
||||
@@ -242,7 +243,7 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
|
||||
return item, exists, nil
|
||||
}
|
||||
|
||||
// Checks if the queue is closed
|
||||
// IsClosed checks if the queue is closed
|
||||
func (f *FIFO) IsClosed() bool {
|
||||
f.closedLock.Lock()
|
||||
defer f.closedLock.Unlock()
|
||||
@@ -267,7 +268,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
||||
// When Close() is called, the f.closed is set and the condition is broadcasted.
|
||||
// Which causes this loop to continue and return from the Pop().
|
||||
if f.IsClosed() {
|
||||
return nil, FIFOClosedError
|
||||
return nil, ErrFIFOClosed
|
||||
}
|
||||
|
||||
f.cond.Wait()
|
||||
|
10
vendor/k8s.io/client-go/tools/cache/heap.go
generated
vendored
10
vendor/k8s.io/client-go/tools/cache/heap.go
generated
vendored
@@ -28,7 +28,9 @@ const (
|
||||
closedMsg = "heap is closed"
|
||||
)
|
||||
|
||||
// LessFunc is used to compare two objects in the heap.
|
||||
type LessFunc func(interface{}, interface{}) bool
|
||||
|
||||
type heapItem struct {
|
||||
obj interface{} // The object which is stored in the heap.
|
||||
index int // The index of the object's key in the Heap.queue.
|
||||
@@ -158,7 +160,7 @@ func (h *Heap) Add(obj interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Adds all the items in the list to the queue and then signals the condition
|
||||
// BulkAdd adds all the items in the list to the queue and then signals the condition
|
||||
// variable. It is useful when the caller would like to add all of the items
|
||||
// to the queue before consumer starts processing them.
|
||||
func (h *Heap) BulkAdd(list []interface{}) error {
|
||||
@@ -249,11 +251,11 @@ func (h *Heap) Pop() (interface{}, error) {
|
||||
h.cond.Wait()
|
||||
}
|
||||
obj := heap.Pop(h.data)
|
||||
if obj != nil {
|
||||
return obj, nil
|
||||
} else {
|
||||
if obj == nil {
|
||||
return nil, fmt.Errorf("object was removed from heap data")
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// List returns a list of all the items.
|
||||
|
27
vendor/k8s.io/client-go/tools/cache/index.go
generated
vendored
27
vendor/k8s.io/client-go/tools/cache/index.go
generated
vendored
@@ -23,17 +23,27 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
// Indexer is a storage interface that lets you list objects using multiple indexing functions
|
||||
// Indexer is a storage interface that lets you list objects using multiple indexing functions.
|
||||
// There are three kinds of strings here.
|
||||
// One is a storage key, as defined in the Store interface.
|
||||
// Another kind is a name of an index.
|
||||
// The third kind of string is an "indexed value", which is produced by an
|
||||
// IndexFunc and can be a field value or any other string computed from the object.
|
||||
type Indexer interface {
|
||||
Store
|
||||
// Retrieve list of objects that match on the named indexing function
|
||||
// Index returns the stored objects whose set of indexed values
|
||||
// intersects the set of indexed values of the given object, for
|
||||
// the named index
|
||||
Index(indexName string, obj interface{}) ([]interface{}, error)
|
||||
// IndexKeys returns the set of keys that match on the named indexing function.
|
||||
IndexKeys(indexName, indexKey string) ([]string, error)
|
||||
// ListIndexFuncValues returns the list of generated values of an Index func
|
||||
// IndexKeys returns the storage keys of the stored objects whose
|
||||
// set of indexed values for the named index includes the given
|
||||
// indexed value
|
||||
IndexKeys(indexName, indexedValue string) ([]string, error)
|
||||
// ListIndexFuncValues returns all the indexed values of the given index
|
||||
ListIndexFuncValues(indexName string) []string
|
||||
// ByIndex lists object that match on the named indexing function with the exact key
|
||||
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
||||
// ByIndex returns the stored objects whose set of indexed values
|
||||
// for the named index includes the given indexed value
|
||||
ByIndex(indexName, indexedValue string) ([]interface{}, error)
|
||||
// GetIndexer return the indexers
|
||||
GetIndexers() Indexers
|
||||
|
||||
@@ -42,7 +52,7 @@ type Indexer interface {
|
||||
AddIndexers(newIndexers Indexers) error
|
||||
}
|
||||
|
||||
// IndexFunc knows how to provide an indexed value for an object.
|
||||
// IndexFunc knows how to compute the set of indexed values for an object.
|
||||
type IndexFunc func(obj interface{}) ([]string, error)
|
||||
|
||||
// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns
|
||||
@@ -65,6 +75,7 @@ func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
|
||||
}
|
||||
|
||||
const (
|
||||
// NamespaceIndex is the lookup name for the most comment index function, which is to index by the namespace field.
|
||||
NamespaceIndex string = "namespace"
|
||||
)
|
||||
|
||||
|
3
vendor/k8s.io/client-go/tools/cache/listers.go
generated
vendored
3
vendor/k8s.io/client-go/tools/cache/listers.go
generated
vendored
@@ -30,6 +30,7 @@ import (
|
||||
// AppendFunc is used to add a matching item to whatever list the caller is using
|
||||
type AppendFunc func(interface{})
|
||||
|
||||
// ListAll calls appendFn with each value retrieved from store which matches the selector.
|
||||
func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
|
||||
selectAll := selector.Empty()
|
||||
for _, m := range store.List() {
|
||||
@@ -50,6 +51,7 @@ func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListAllByNamespace used to list items belongs to namespace from Indexer.
|
||||
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
|
||||
selectAll := selector.Empty()
|
||||
if namespace == metav1.NamespaceAll {
|
||||
@@ -124,6 +126,7 @@ type GenericNamespaceLister interface {
|
||||
Get(name string) (runtime.Object, error)
|
||||
}
|
||||
|
||||
// NewGenericLister creates a new instance for the genericLister.
|
||||
func NewGenericLister(indexer Indexer, resource schema.GroupResource) GenericLister {
|
||||
return &genericLister{indexer: indexer, resource: resource}
|
||||
}
|
||||
|
1
vendor/k8s.io/client-go/tools/cache/mutation_cache.go
generated
vendored
1
vendor/k8s.io/client-go/tools/cache/mutation_cache.go
generated
vendored
@@ -42,6 +42,7 @@ type MutationCache interface {
|
||||
Mutation(interface{})
|
||||
}
|
||||
|
||||
// ResourceVersionComparator is able to compare object versions.
|
||||
type ResourceVersionComparator interface {
|
||||
CompareResourceVersion(lhs, rhs runtime.Object) int
|
||||
}
|
||||
|
8
vendor/k8s.io/client-go/tools/cache/mutation_detector.go
generated
vendored
8
vendor/k8s.io/client-go/tools/cache/mutation_detector.go
generated
vendored
@@ -36,12 +36,14 @@ func init() {
|
||||
mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
|
||||
}
|
||||
|
||||
type CacheMutationDetector interface {
|
||||
// MutationDetector is able to monitor if the object be modified outside.
|
||||
type MutationDetector interface {
|
||||
AddObject(obj interface{})
|
||||
Run(stopCh <-chan struct{})
|
||||
}
|
||||
|
||||
func NewCacheMutationDetector(name string) CacheMutationDetector {
|
||||
// NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.
|
||||
func NewCacheMutationDetector(name string) MutationDetector {
|
||||
if !mutationDetectionEnabled {
|
||||
return dummyMutationDetector{}
|
||||
}
|
||||
@@ -114,7 +116,7 @@ func (d *defaultCacheMutationDetector) CompareObjects() {
|
||||
altered := false
|
||||
for i, obj := range d.cachedObjs {
|
||||
if !reflect.DeepEqual(obj.cached, obj.copied) {
|
||||
fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectDiff(obj.cached, obj.copied))
|
||||
fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
|
||||
altered = true
|
||||
}
|
||||
}
|
||||
|
41
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
41
vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -24,7 +25,6 @@ import (
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/utils/trace"
|
||||
)
|
||||
@@ -68,6 +69,9 @@ type Reflector struct {
|
||||
lastSyncResourceVersion string
|
||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||
lastSyncResourceVersionMutex sync.RWMutex
|
||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||
// Defaults to pager.PageSize.
|
||||
WatchListPageSize int64
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -79,7 +83,7 @@ var (
|
||||
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
|
||||
// The indexer is configured to key on namespace
|
||||
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
|
||||
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
|
||||
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
|
||||
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
|
||||
return indexer, reflector
|
||||
}
|
||||
@@ -108,11 +112,6 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
|
||||
return r
|
||||
}
|
||||
|
||||
func makeValidPrometheusMetricLabel(in string) string {
|
||||
// this isn't perfect, but it removes our common characters
|
||||
return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
|
||||
}
|
||||
|
||||
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
|
||||
// call chains to NewReflector, so they'd be low entropy names for reflectors
|
||||
var internalPackages = []string{"client-go/tools/cache/"}
|
||||
@@ -167,7 +166,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
options := metav1.ListOptions{ResourceVersion: "0"}
|
||||
|
||||
if err := func() error {
|
||||
initTrace := trace.New("Reflector " + r.name + " ListAndWatch")
|
||||
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
|
||||
defer initTrace.LogIfLong(10 * time.Second)
|
||||
var list runtime.Object
|
||||
var err error
|
||||
@@ -179,7 +178,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
panicCh <- r
|
||||
}
|
||||
}()
|
||||
list, err = r.listerWatcher.List(options)
|
||||
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
|
||||
// list request will return the full response.
|
||||
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
|
||||
return r.listerWatcher.List(opts)
|
||||
}))
|
||||
if r.WatchListPageSize != 0 {
|
||||
pager.PageSize = r.WatchListPageSize
|
||||
}
|
||||
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||
list, err = pager.List(context.Background(), options)
|
||||
close(listCh)
|
||||
}()
|
||||
select {
|
||||
@@ -257,6 +265,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
// We want to avoid situations of hanging watchers. Stop any wachers that do not
|
||||
// receive any events within the timeout window.
|
||||
TimeoutSeconds: &timeoutSeconds,
|
||||
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
|
||||
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
|
||||
// watch bookmarks, it will ignore this field).
|
||||
AllowWatchBookmarks: true,
|
||||
}
|
||||
|
||||
w, err := r.listerWatcher.Watch(options)
|
||||
@@ -286,7 +298,12 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||
if err != errorStopRequested {
|
||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
||||
switch {
|
||||
case apierrs.IsResourceExpired(err):
|
||||
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
||||
default:
|
||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -354,6 +371,8 @@ loop:
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
|
||||
}
|
||||
case watch.Bookmark:
|
||||
// A `Bookmark` means watch has synced here, just update the resourceVersion
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
|
||||
}
|
||||
@@ -363,7 +382,7 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
watchDuration := r.clock.Now().Sub(start)
|
||||
watchDuration := r.clock.Since(start)
|
||||
if watchDuration < 1*time.Second && eventCount == 0 {
|
||||
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
|
||||
}
|
||||
|
17
vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
generated
vendored
17
vendor/k8s.io/client-go/tools/cache/reflector_metrics.go
generated
vendored
@@ -94,23 +94,6 @@ var metricsFactory = struct {
|
||||
metricsProvider: noopMetricsProvider{},
|
||||
}
|
||||
|
||||
func newReflectorMetrics(name string) *reflectorMetrics {
|
||||
var ret *reflectorMetrics
|
||||
if len(name) == 0 {
|
||||
return ret
|
||||
}
|
||||
return &reflectorMetrics{
|
||||
numberOfLists: metricsFactory.metricsProvider.NewListsMetric(name),
|
||||
listDuration: metricsFactory.metricsProvider.NewListDurationMetric(name),
|
||||
numberOfItemsInList: metricsFactory.metricsProvider.NewItemsInListMetric(name),
|
||||
numberOfWatches: metricsFactory.metricsProvider.NewWatchesMetric(name),
|
||||
numberOfShortWatches: metricsFactory.metricsProvider.NewShortWatchesMetric(name),
|
||||
watchDuration: metricsFactory.metricsProvider.NewWatchDurationMetric(name),
|
||||
numberOfItemsInWatch: metricsFactory.metricsProvider.NewItemsInWatchMetric(name),
|
||||
lastResourceVersion: metricsFactory.metricsProvider.NewLastResourceVersionMetric(name),
|
||||
}
|
||||
}
|
||||
|
||||
// SetReflectorMetricsProvider sets the metrics provider
|
||||
func SetReflectorMetricsProvider(metricsProvider MetricsProvider) {
|
||||
metricsFactory.setProviders.Do(func() {
|
||||
|
90
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
90
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
@@ -31,31 +31,84 @@ import (
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
// SharedInformer has a shared data cache and is capable of distributing notifications for changes
|
||||
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
|
||||
// one behavior change compared to a standard Informer. When you receive a notification, the cache
|
||||
// will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
|
||||
// on the contents of the cache exactly matching the notification you've received in handler
|
||||
// functions. If there was a create, followed by a delete, the cache may NOT have your item. This
|
||||
// has advantages over the broadcaster since it allows us to share a common cache across many
|
||||
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
|
||||
// watch.
|
||||
// SharedInformer provides eventually consistent linkage of its
|
||||
// clients to the authoritative state of a given collection of
|
||||
// objects. An object is identified by its API group, kind/resource,
|
||||
// namespace, and name. One SharedInformer provides linkage to objects
|
||||
// of a particular API group and kind/resource. The linked object
|
||||
// collection of a SharedInformer may be further restricted to one
|
||||
// namespace and/or by label selector and/or field selector.
|
||||
//
|
||||
// The authoritative state of an object is what apiservers provide
|
||||
// access to, and an object goes through a strict sequence of states.
|
||||
// A state is either "absent" or present with a ResourceVersion and
|
||||
// other appropriate content.
|
||||
//
|
||||
// A SharedInformer maintains a local cache, exposed by GetStore(), of
|
||||
// the state of each relevant object. This cache is eventually
|
||||
// consistent with the authoritative state. This means that, unless
|
||||
// prevented by persistent communication problems, if ever a
|
||||
// particular object ID X is authoritatively associated with a state S
|
||||
// then for every SharedInformer I whose collection includes (X, S)
|
||||
// eventually either (1) I's cache associates X with S or a later
|
||||
// state of X, (2) I is stopped, or (3) the authoritative state
|
||||
// service for X terminates. To be formally complete, we say that the
|
||||
// absent state meets any restriction by label selector or field
|
||||
// selector.
|
||||
//
|
||||
// As a simple example, if a collection of objects is henceforeth
|
||||
// unchanging and a SharedInformer is created that links to that
|
||||
// collection then that SharedInformer's cache eventually holds an
|
||||
// exact copy of that collection (unless it is stopped too soon, the
|
||||
// authoritative state service ends, or communication problems between
|
||||
// the two persistently thwart achievement).
|
||||
//
|
||||
// As another simple example, if the local cache ever holds a
|
||||
// non-absent state for some object ID and the object is eventually
|
||||
// removed from the authoritative state then eventually the object is
|
||||
// removed from the local cache (unless the SharedInformer is stopped
|
||||
// too soon, the authoritative state service ends, or communication
|
||||
// problems persistently thwart the desired result).
|
||||
//
|
||||
// The keys in GetStore() are of the form namespace/name for namespaced
|
||||
// objects, and are simply the name for non-namespaced objects.
|
||||
//
|
||||
// A client is identified here by a ResourceEventHandler. For every
|
||||
// update to the SharedInformer's local cache and for every client,
|
||||
// eventually either the SharedInformer is stopped or the client is
|
||||
// notified of the update. These notifications happen after the
|
||||
// corresponding cache update and, in the case of a
|
||||
// SharedIndexInformer, after the corresponding index updates. It is
|
||||
// possible that additional cache and index updates happen before such
|
||||
// a prescribed notification. For a given SharedInformer and client,
|
||||
// all notifications are delivered sequentially. For a given
|
||||
// SharedInformer, client, and object ID, the notifications are
|
||||
// delivered in order.
|
||||
//
|
||||
// A delete notification exposes the last locally known non-absent
|
||||
// state, except that its ResourceVersion is replaced with a
|
||||
// ResourceVersion in which the object is actually absent.
|
||||
type SharedInformer interface {
|
||||
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
|
||||
// period. Events to a single handler are delivered sequentially, but there is no coordination
|
||||
// between different handlers.
|
||||
AddEventHandler(handler ResourceEventHandler)
|
||||
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
|
||||
// specified resync period. Events to a single handler are delivered sequentially, but there is
|
||||
// no coordination between different handlers.
|
||||
// AddEventHandlerWithResyncPeriod adds an event handler to the
|
||||
// shared informer using the specified resync period. The resync
|
||||
// operation consists of delivering to the handler a create
|
||||
// notification for every object in the informer's local cache; it
|
||||
// does not add any interactions with the authoritative storage.
|
||||
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
|
||||
// GetStore returns the Store.
|
||||
// GetStore returns the informer's local cache as a Store.
|
||||
GetStore() Store
|
||||
// GetController gives back a synthetic interface that "votes" to start the informer
|
||||
GetController() Controller
|
||||
// Run starts the shared informer, which will be stopped when stopCh is closed.
|
||||
// Run starts and runs the shared informer, returning after it stops.
|
||||
// The informer will be stopped when stopCh is closed.
|
||||
Run(stopCh <-chan struct{})
|
||||
// HasSynced returns true if the shared informer's store has synced.
|
||||
// HasSynced returns true if the shared informer's store has been
|
||||
// informed by at least one full LIST of the authoritative state
|
||||
// of the informer's object collection. This is unrelated to "resync".
|
||||
HasSynced() bool
|
||||
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
|
||||
// store. The value returned is not synchronized with access to the underlying store and is not
|
||||
@@ -63,6 +116,7 @@ type SharedInformer interface {
|
||||
LastSyncResourceVersion() string
|
||||
}
|
||||
|
||||
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
|
||||
type SharedIndexInformer interface {
|
||||
SharedInformer
|
||||
// AddIndexers add indexers to the informer before it starts.
|
||||
@@ -129,7 +183,7 @@ type sharedIndexInformer struct {
|
||||
controller Controller
|
||||
|
||||
processor *sharedProcessor
|
||||
cacheMutationDetector CacheMutationDetector
|
||||
cacheMutationDetector MutationDetector
|
||||
|
||||
// This block is tracked to handle late initialization of the controller
|
||||
listerWatcher ListerWatcher
|
||||
@@ -169,7 +223,7 @@ func (v *dummyController) HasSynced() bool {
|
||||
return v.informer.HasSynced()
|
||||
}
|
||||
|
||||
func (c *dummyController) LastSyncResourceVersion() string {
|
||||
func (v *dummyController) LastSyncResourceVersion() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -555,7 +609,7 @@ func (p *processorListener) run() {
|
||||
case deleteNotification:
|
||||
p.handler.OnDelete(notification.oldObj)
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
|
||||
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
|
||||
}
|
||||
}
|
||||
// the only way to get here is if the p.nextCh is empty and closed
|
||||
|
0
vendor/k8s.io/client-go/tools/cache/store.go
generated
vendored
Executable file → Normal file
0
vendor/k8s.io/client-go/tools/cache/store.go
generated
vendored
Executable file → Normal file
3
vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
generated
vendored
3
vendor/k8s.io/client-go/tools/cache/thread_safe_store.go
generated
vendored
@@ -185,7 +185,7 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
|
||||
|
||||
set := index[indexKey]
|
||||
list := make([]interface{}, 0, set.Len())
|
||||
for _, key := range set.List() {
|
||||
for key := range set {
|
||||
list = append(list, c.items[key])
|
||||
}
|
||||
|
||||
@@ -302,6 +302,7 @@ func (c *threadSafeMap) Resync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
|
||||
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
|
||||
return &threadSafeMap{
|
||||
items: map[string]interface{}{},
|
||||
|
8
vendor/k8s.io/client-go/tools/cache/undelta_store.go
generated
vendored
8
vendor/k8s.io/client-go/tools/cache/undelta_store.go
generated
vendored
@@ -31,6 +31,7 @@ type UndeltaStore struct {
|
||||
// Assert that it implements the Store interface.
|
||||
var _ Store = &UndeltaStore{}
|
||||
|
||||
// Add inserts an object into the store and sends complete state by calling PushFunc.
|
||||
// Note about thread safety. The Store implementation (cache.cache) uses a lock for all methods.
|
||||
// In the functions below, the lock gets released and reacquired betweend the {Add,Delete,etc}
|
||||
// and the List. So, the following can happen, resulting in two identical calls to PushFunc.
|
||||
@@ -41,7 +42,6 @@ var _ Store = &UndeltaStore{}
|
||||
// 3 Store.Add(b)
|
||||
// 4 Store.List() -> [a,b]
|
||||
// 5 Store.List() -> [a,b]
|
||||
|
||||
func (u *UndeltaStore) Add(obj interface{}) error {
|
||||
if err := u.Store.Add(obj); err != nil {
|
||||
return err
|
||||
@@ -50,6 +50,7 @@ func (u *UndeltaStore) Add(obj interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update sets an item in the cache to its updated state and sends complete state by calling PushFunc.
|
||||
func (u *UndeltaStore) Update(obj interface{}) error {
|
||||
if err := u.Store.Update(obj); err != nil {
|
||||
return err
|
||||
@@ -58,6 +59,7 @@ func (u *UndeltaStore) Update(obj interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes an item from the cache and sends complete state by calling PushFunc.
|
||||
func (u *UndeltaStore) Delete(obj interface{}) error {
|
||||
if err := u.Store.Delete(obj); err != nil {
|
||||
return err
|
||||
@@ -66,6 +68,10 @@ func (u *UndeltaStore) Delete(obj interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Replace will delete the contents of current store, using instead the given list.
|
||||
// 'u' takes ownership of the list, you should not reference the list again
|
||||
// after calling this function.
|
||||
// The new contents complete state will be sent by calling PushFunc after replacement.
|
||||
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
|
||||
if err := u.Store.Replace(list, resourceVersion); err != nil {
|
||||
return err
|
||||
|
16
vendor/k8s.io/client-go/tools/clientcmd/client_config.go
generated
vendored
16
vendor/k8s.io/client-go/tools/clientcmd/client_config.go
generated
vendored
@@ -228,6 +228,7 @@ func (config *DirectClientConfig) getUserIdentificationPartialConfig(configAuthI
|
||||
// blindly overwrite existing values based on precedence
|
||||
if len(configAuthInfo.Token) > 0 {
|
||||
mergedConfig.BearerToken = configAuthInfo.Token
|
||||
mergedConfig.BearerTokenFile = configAuthInfo.TokenFile
|
||||
} else if len(configAuthInfo.TokenFile) > 0 {
|
||||
tokenBytes, err := ioutil.ReadFile(configAuthInfo.TokenFile)
|
||||
if err != nil {
|
||||
@@ -296,16 +297,6 @@ func makeUserIdentificationConfig(info clientauth.Info) *restclient.Config {
|
||||
return config
|
||||
}
|
||||
|
||||
// makeUserIdentificationFieldsConfig returns a client.Config capable of being merged using mergo for only server identification information
|
||||
func makeServerIdentificationConfig(info clientauth.Info) restclient.Config {
|
||||
config := restclient.Config{}
|
||||
config.CAFile = info.CAFile
|
||||
if info.Insecure != nil {
|
||||
config.Insecure = *info.Insecure
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
func canIdentifyUser(config restclient.Config) bool {
|
||||
return len(config.Username) > 0 ||
|
||||
(len(config.CertFile) > 0 || len(config.CertData) > 0) ||
|
||||
@@ -499,8 +490,9 @@ func (config *inClusterClientConfig) ClientConfig() (*restclient.Config, error)
|
||||
if server := config.overrides.ClusterInfo.Server; len(server) > 0 {
|
||||
icc.Host = server
|
||||
}
|
||||
if token := config.overrides.AuthInfo.Token; len(token) > 0 {
|
||||
icc.BearerToken = token
|
||||
if len(config.overrides.AuthInfo.Token) > 0 || len(config.overrides.AuthInfo.TokenFile) > 0 {
|
||||
icc.BearerToken = config.overrides.AuthInfo.Token
|
||||
icc.BearerTokenFile = config.overrides.AuthInfo.TokenFile
|
||||
}
|
||||
if certificateAuthorityFile := config.overrides.ClusterInfo.CertificateAuthority; len(certificateAuthorityFile) > 0 {
|
||||
icc.TLSClientConfig.CAFile = certificateAuthorityFile
|
||||
|
28
vendor/k8s.io/client-go/tools/clientcmd/loader.go
generated
vendored
28
vendor/k8s.io/client-go/tools/clientcmd/loader.go
generated
vendored
@@ -127,6 +127,10 @@ type ClientConfigLoadingRules struct {
|
||||
// DefaultClientConfig is an optional field indicating what rules to use to calculate a default configuration.
|
||||
// This should match the overrides passed in to ClientConfig loader.
|
||||
DefaultClientConfig ClientConfig
|
||||
|
||||
// WarnIfAllMissing indicates whether the configuration files pointed by KUBECONFIG environment variable are present or not.
|
||||
// In case of missing files, it warns the user about the missing files.
|
||||
WarnIfAllMissing bool
|
||||
}
|
||||
|
||||
// ClientConfigLoadingRules implements the ClientConfigLoader interface.
|
||||
@@ -136,20 +140,23 @@ var _ ClientConfigLoader = &ClientConfigLoadingRules{}
|
||||
// use this constructor
|
||||
func NewDefaultClientConfigLoadingRules() *ClientConfigLoadingRules {
|
||||
chain := []string{}
|
||||
warnIfAllMissing := false
|
||||
|
||||
envVarFiles := os.Getenv(RecommendedConfigPathEnvVar)
|
||||
if len(envVarFiles) != 0 {
|
||||
fileList := filepath.SplitList(envVarFiles)
|
||||
// prevent the same path load multiple times
|
||||
chain = append(chain, deduplicate(fileList)...)
|
||||
warnIfAllMissing = true
|
||||
|
||||
} else {
|
||||
chain = append(chain, RecommendedHomeFile)
|
||||
}
|
||||
|
||||
return &ClientConfigLoadingRules{
|
||||
Precedence: chain,
|
||||
MigrationRules: currentMigrationRules(),
|
||||
Precedence: chain,
|
||||
MigrationRules: currentMigrationRules(),
|
||||
WarnIfAllMissing: warnIfAllMissing,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,6 +179,7 @@ func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
|
||||
}
|
||||
|
||||
errlist := []error{}
|
||||
missingList := []string{}
|
||||
|
||||
kubeConfigFiles := []string{}
|
||||
|
||||
@@ -195,18 +203,26 @@ func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
|
||||
}
|
||||
|
||||
config, err := LoadFromFile(filename)
|
||||
|
||||
if os.IsNotExist(err) {
|
||||
// skip missing files
|
||||
// Add to the missing list to produce a warning
|
||||
missingList = append(missingList, filename)
|
||||
continue
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
errlist = append(errlist, fmt.Errorf("Error loading config file \"%s\": %v", filename, err))
|
||||
errlist = append(errlist, fmt.Errorf("error loading config file \"%s\": %v", filename, err))
|
||||
continue
|
||||
}
|
||||
|
||||
kubeconfigs = append(kubeconfigs, config)
|
||||
}
|
||||
|
||||
if rules.WarnIfAllMissing && len(missingList) > 0 && len(kubeconfigs) == 0 {
|
||||
klog.Warningf("Config not found: %s", strings.Join(missingList, ", "))
|
||||
}
|
||||
|
||||
// first merge all of our maps
|
||||
mapConfig := clientcmdapi.NewConfig()
|
||||
|
||||
@@ -356,7 +372,7 @@ func LoadFromFile(filename string) (*clientcmdapi.Config, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.V(6).Infoln("Config loaded from file", filename)
|
||||
klog.V(6).Infoln("Config loaded from file: ", filename)
|
||||
|
||||
// set LocationOfOrigin on every Cluster, User, and Context
|
||||
for key, obj := range config.AuthInfos {
|
||||
@@ -467,7 +483,7 @@ func ResolveLocalPaths(config *clientcmdapi.Config) error {
|
||||
}
|
||||
base, err := filepath.Abs(filepath.Dir(cluster.LocationOfOrigin))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not determine the absolute path of config file %s: %v", cluster.LocationOfOrigin, err)
|
||||
return fmt.Errorf("could not determine the absolute path of config file %s: %v", cluster.LocationOfOrigin, err)
|
||||
}
|
||||
|
||||
if err := ResolvePaths(GetClusterFileReferences(cluster), base); err != nil {
|
||||
@@ -480,7 +496,7 @@ func ResolveLocalPaths(config *clientcmdapi.Config) error {
|
||||
}
|
||||
base, err := filepath.Abs(filepath.Dir(authInfo.LocationOfOrigin))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not determine the absolute path of config file %s: %v", authInfo.LocationOfOrigin, err)
|
||||
return fmt.Errorf("could not determine the absolute path of config file %s: %v", authInfo.LocationOfOrigin, err)
|
||||
}
|
||||
|
||||
if err := ResolvePaths(GetAuthInfoFileReferences(authInfo), base); err != nil {
|
||||
|
2
vendor/k8s.io/client-go/tools/clientcmd/validation.go
generated
vendored
2
vendor/k8s.io/client-go/tools/clientcmd/validation.go
generated
vendored
@@ -250,8 +250,6 @@ func validateAuthInfo(authInfoName string, authInfo clientcmdapi.AuthInfo) []err
|
||||
for _, v := range authInfo.Exec.Env {
|
||||
if len(v.Name) == 0 {
|
||||
validationErrors = append(validationErrors, fmt.Errorf("env variable name must be specified for %v to use exec authentication plugin", authInfoName))
|
||||
} else if len(v.Value) == 0 {
|
||||
validationErrors = append(validationErrors, fmt.Errorf("env variable %s value must be specified for %v to use exec authentication plugin", v.Name, authInfoName))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
34
vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go
generated
vendored
34
vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go
generated
vendored
@@ -16,12 +16,16 @@ limitations under the License.
|
||||
|
||||
// Package leaderelection implements leader election of a set of endpoints.
|
||||
// It uses an annotation in the endpoints object to store the record of the
|
||||
// election state.
|
||||
// election state. This implementation does not guarantee that only one
|
||||
// client is acting as a leader (a.k.a. fencing).
|
||||
//
|
||||
// This implementation does not guarantee that only one client is acting as a
|
||||
// leader (a.k.a. fencing). A client observes timestamps captured locally to
|
||||
// infer the state of the leader election. Thus the implementation is tolerant
|
||||
// to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate.
|
||||
// A client only acts on timestamps captured locally to infer the state of the
|
||||
// leader election. The client does not consider timestamps in the leader
|
||||
// election record to be accurate because these timestamps may not have been
|
||||
// produced by a local clock. The implemention does not depend on their
|
||||
// accuracy and only uses their change to indicate that another client has
|
||||
// renewed the leader lease. Thus the implementation is tolerant to arbitrary
|
||||
// clock skew, but is not tolerant to arbitrary clock skew rate.
|
||||
//
|
||||
// However the level of tolerance to skew rate can be configured by setting
|
||||
// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
|
||||
@@ -85,6 +89,12 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
|
||||
if lec.RetryPeriod < 1 {
|
||||
return nil, fmt.Errorf("retryPeriod must be greater than zero")
|
||||
}
|
||||
if lec.Callbacks.OnStartedLeading == nil {
|
||||
return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
|
||||
}
|
||||
if lec.Callbacks.OnStoppedLeading == nil {
|
||||
return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
|
||||
}
|
||||
|
||||
if lec.Lock == nil {
|
||||
return nil, fmt.Errorf("Lock must not be nil.")
|
||||
@@ -105,12 +115,26 @@ type LeaderElectionConfig struct {
|
||||
// LeaseDuration is the duration that non-leader candidates will
|
||||
// wait to force acquire leadership. This is measured against time of
|
||||
// last observed ack.
|
||||
//
|
||||
// A client needs to wait a full LeaseDuration without observing a change to
|
||||
// the record before it can attempt to take over. When all clients are
|
||||
// shutdown and a new set of clients are started with different names against
|
||||
// the same leader record, they must wait the full LeaseDuration before
|
||||
// attempting to acquire the lease. Thus LeaseDuration should be as short as
|
||||
// possible (within your tolerance for clock skew rate) to avoid a possible
|
||||
// long waits in the scenario.
|
||||
//
|
||||
// Core clients default this value to 15 seconds.
|
||||
LeaseDuration time.Duration
|
||||
// RenewDeadline is the duration that the acting master will retry
|
||||
// refreshing leadership before giving up.
|
||||
//
|
||||
// Core clients default this value to 10 seconds.
|
||||
RenewDeadline time.Duration
|
||||
// RetryPeriod is the duration the LeaderElector clients should wait
|
||||
// between tries of actions.
|
||||
//
|
||||
// Core clients default this value to 2 seconds.
|
||||
RetryPeriod time.Duration
|
||||
|
||||
// Callbacks are callbacks that are triggered during certain lifecycle
|
||||
|
114
vendor/k8s.io/client-go/tools/pager/pager.go
generated
vendored
114
vendor/k8s.io/client-go/tools/pager/pager.go
generated
vendored
@@ -25,9 +25,11 @@ import (
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
const defaultPageSize = 500
|
||||
const defaultPageBufferSize = 10
|
||||
|
||||
// ListPageFunc returns a list object for the given list options.
|
||||
type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
|
||||
@@ -48,6 +50,9 @@ type ListPager struct {
|
||||
PageFn ListPageFunc
|
||||
|
||||
FullListIfExpired bool
|
||||
|
||||
// Number of pages to buffer
|
||||
PageBufferSize int32
|
||||
}
|
||||
|
||||
// New creates a new pager from the provided pager function using the default
|
||||
@@ -58,6 +63,7 @@ func New(fn ListPageFunc) *ListPager {
|
||||
PageSize: defaultPageSize,
|
||||
PageFn: fn,
|
||||
FullListIfExpired: true,
|
||||
PageBufferSize: defaultPageBufferSize,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +79,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
|
||||
}
|
||||
var list *metainternalversion.List
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
obj, err := p.PageFn(ctx, options)
|
||||
if err != nil {
|
||||
if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
|
||||
@@ -115,3 +127,105 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
|
||||
options.Continue = m.GetContinue()
|
||||
}
|
||||
}
|
||||
|
||||
// EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If
|
||||
// fn returns an error, processing stops and that error is returned. If fn does not return an error,
|
||||
// any error encountered while retrieving the list from the server is returned. If the context
|
||||
// cancels or times out, the context error is returned. Since the list is retrieved in paginated
|
||||
// chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list
|
||||
// requests exceed the expiration limit of the apiserver being called.
|
||||
//
|
||||
// Items are retrieved in chunks from the server to reduce the impact on the server with up to
|
||||
// ListPager.PageBufferSize chunks buffered concurrently in the background.
|
||||
func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
|
||||
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
|
||||
return meta.EachListItem(obj, fn)
|
||||
})
|
||||
}
|
||||
|
||||
// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
|
||||
// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
|
||||
// not return an error, any error encountered while retrieving the list from the server is
|
||||
// returned. If the context cancels or times out, the context error is returned. Since the list is
|
||||
// retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if
|
||||
// the pagination list requests exceed the expiration limit of the apiserver being called.
|
||||
//
|
||||
// Up to ListPager.PageBufferSize chunks are buffered concurrently in the background.
|
||||
func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
|
||||
if p.PageBufferSize < 0 {
|
||||
return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize)
|
||||
}
|
||||
|
||||
// Ensure background goroutine is stopped if this call exits before all list items are
|
||||
// processed. Cancelation error from this deferred cancel call is never returned to caller;
|
||||
// either the list result has already been sent to bgResultC or the fn error is returned and
|
||||
// the cancelation error is discarded.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
chunkC := make(chan runtime.Object, p.PageBufferSize)
|
||||
bgResultC := make(chan error, 1)
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
var err error
|
||||
defer func() {
|
||||
close(chunkC)
|
||||
bgResultC <- err
|
||||
}()
|
||||
err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error {
|
||||
select {
|
||||
case chunkC <- chunk: // buffer the chunk, this can block
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
for o := range chunkC {
|
||||
err := fn(o)
|
||||
if err != nil {
|
||||
return err // any fn error should be returned immediately
|
||||
}
|
||||
}
|
||||
// promote the results of our background goroutine to the foreground
|
||||
return <-bgResultC
|
||||
}
|
||||
|
||||
// eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list
|
||||
// chunk. If fn returns an error, processing stops and that error is returned. If fn does not return
|
||||
// an error, any error encountered while retrieving the list from the server is returned. If the
|
||||
// context cancels or times out, the context error is returned. Since the list is retrieved in
|
||||
// paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the
|
||||
// pagination list requests exceed the expiration limit of the apiserver being called.
|
||||
func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
|
||||
if options.Limit == 0 {
|
||||
options.Limit = p.PageSize
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
obj, err := p.PageFn(ctx, options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m, err := meta.ListAccessor(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("returned object must be a list: %v", err)
|
||||
}
|
||||
if err := fn(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
// if we have no more items, return.
|
||||
if len(m.GetContinue()) == 0 {
|
||||
return nil
|
||||
}
|
||||
// set the next loop up
|
||||
options.Continue = m.GetContinue()
|
||||
}
|
||||
}
|
||||
|
64
vendor/k8s.io/client-go/tools/record/event.go
generated
vendored
64
vendor/k8s.io/client-go/tools/record/event.go
generated
vendored
@@ -50,6 +50,40 @@ type EventSink interface {
|
||||
Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
|
||||
}
|
||||
|
||||
// CorrelatorOptions allows you to change the default of the EventSourceObjectSpamFilter
|
||||
// and EventAggregator in EventCorrelator
|
||||
type CorrelatorOptions struct {
|
||||
// The lru cache size used for both EventSourceObjectSpamFilter and the EventAggregator
|
||||
// If not specified (zero value), the default specified in events_cache.go will be picked
|
||||
// This means that the LRUCacheSize has to be greater than 0.
|
||||
LRUCacheSize int
|
||||
// The burst size used by the token bucket rate filtering in EventSourceObjectSpamFilter
|
||||
// If not specified (zero value), the default specified in events_cache.go will be picked
|
||||
// This means that the BurstSize has to be greater than 0.
|
||||
BurstSize int
|
||||
// The fill rate of the token bucket in queries per second in EventSourceObjectSpamFilter
|
||||
// If not specified (zero value), the default specified in events_cache.go will be picked
|
||||
// This means that the QPS has to be greater than 0.
|
||||
QPS float32
|
||||
// The func used by the EventAggregator to group event keys for aggregation
|
||||
// If not specified (zero value), EventAggregatorByReasonFunc will be used
|
||||
KeyFunc EventAggregatorKeyFunc
|
||||
// The func used by the EventAggregator to produced aggregated message
|
||||
// If not specified (zero value), EventAggregatorByReasonMessageFunc will be used
|
||||
MessageFunc EventAggregatorMessageFunc
|
||||
// The number of events in an interval before aggregation happens by the EventAggregator
|
||||
// If not specified (zero value), the default specified in events_cache.go will be picked
|
||||
// This means that the MaxEvents has to be greater than 0
|
||||
MaxEvents int
|
||||
// The amount of time in seconds that must transpire since the last occurrence of a similar event before it is considered new by the EventAggregator
|
||||
// If not specified (zero value), the default specified in events_cache.go will be picked
|
||||
// This means that the MaxIntervalInSeconds has to be greater than 0
|
||||
MaxIntervalInSeconds int
|
||||
// The clock used by the EventAggregator to allow for testing
|
||||
// If not specified (zero value), clock.RealClock{} will be used
|
||||
Clock clock.Clock
|
||||
}
|
||||
|
||||
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||
type EventRecorder interface {
|
||||
// Event constructs an event from the given information and puts it in the queue for sending.
|
||||
@@ -97,33 +131,45 @@ type EventBroadcaster interface {
|
||||
|
||||
// Creates a new event broadcaster.
|
||||
func NewBroadcaster() EventBroadcaster {
|
||||
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
|
||||
return &eventBroadcasterImpl{
|
||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||
sleepDuration: defaultSleepDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
|
||||
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
|
||||
return &eventBroadcasterImpl{
|
||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||
sleepDuration: sleepDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
|
||||
return &eventBroadcasterImpl{
|
||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||
sleepDuration: defaultSleepDuration,
|
||||
options: options,
|
||||
}
|
||||
}
|
||||
|
||||
type eventBroadcasterImpl struct {
|
||||
*watch.Broadcaster
|
||||
sleepDuration time.Duration
|
||||
options CorrelatorOptions
|
||||
}
|
||||
|
||||
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
|
||||
// The return value can be ignored or used to stop recording, if desired.
|
||||
// TODO: make me an object with parameterizable queue length and retry interval
|
||||
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
|
||||
// The default math/rand package functions aren't thread safe, so create a
|
||||
// new Rand object for each StartRecording call.
|
||||
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
eventCorrelator := NewEventCorrelator(clock.RealClock{})
|
||||
eventCorrelator := NewEventCorrelatorWithOptions(eventBroadcaster.options)
|
||||
return eventBroadcaster.StartEventWatcher(
|
||||
func(event *v1.Event) {
|
||||
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
|
||||
recordToSink(sink, event, eventCorrelator, eventBroadcaster.sleepDuration)
|
||||
})
|
||||
}
|
||||
|
||||
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
|
||||
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
// Events are safe to copy like this.
|
||||
eventCopy := *event
|
||||
@@ -148,7 +194,7 @@ func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrela
|
||||
// Randomize the first sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
if tries == 1 {
|
||||
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
|
||||
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
|
||||
} else {
|
||||
time.Sleep(sleepDuration)
|
||||
}
|
||||
|
46
vendor/k8s.io/client-go/tools/record/events_cache.go
generated
vendored
46
vendor/k8s.io/client-go/tools/record/events_cache.go
generated
vendored
@@ -443,6 +443,52 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
||||
}
|
||||
}
|
||||
|
||||
func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
|
||||
optionsWithDefaults := populateDefaults(options)
|
||||
spamFilter := NewEventSourceObjectSpamFilter(optionsWithDefaults.LRUCacheSize,
|
||||
optionsWithDefaults.BurstSize, optionsWithDefaults.QPS, optionsWithDefaults.Clock)
|
||||
return &EventCorrelator{
|
||||
filterFunc: spamFilter.Filter,
|
||||
aggregator: NewEventAggregator(
|
||||
optionsWithDefaults.LRUCacheSize,
|
||||
optionsWithDefaults.KeyFunc,
|
||||
optionsWithDefaults.MessageFunc,
|
||||
optionsWithDefaults.MaxEvents,
|
||||
optionsWithDefaults.MaxIntervalInSeconds,
|
||||
optionsWithDefaults.Clock),
|
||||
logger: newEventLogger(optionsWithDefaults.LRUCacheSize, optionsWithDefaults.Clock),
|
||||
}
|
||||
}
|
||||
|
||||
// populateDefaults populates the zero value options with defaults
|
||||
func populateDefaults(options CorrelatorOptions) CorrelatorOptions {
|
||||
if options.LRUCacheSize == 0 {
|
||||
options.LRUCacheSize = maxLruCacheEntries
|
||||
}
|
||||
if options.BurstSize == 0 {
|
||||
options.BurstSize = defaultSpamBurst
|
||||
}
|
||||
if options.QPS == 0 {
|
||||
options.QPS = defaultSpamQPS
|
||||
}
|
||||
if options.KeyFunc == nil {
|
||||
options.KeyFunc = EventAggregatorByReasonFunc
|
||||
}
|
||||
if options.MessageFunc == nil {
|
||||
options.MessageFunc = EventAggregatorByReasonMessageFunc
|
||||
}
|
||||
if options.MaxEvents == 0 {
|
||||
options.MaxEvents = defaultAggregateMaxEvents
|
||||
}
|
||||
if options.MaxIntervalInSeconds == 0 {
|
||||
options.MaxIntervalInSeconds = defaultAggregateIntervalInSeconds
|
||||
}
|
||||
if options.Clock == nil {
|
||||
options.Clock = clock.RealClock{}
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
|
||||
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
|
||||
if newEvent == nil {
|
||||
|
53
vendor/k8s.io/client-go/tools/reference/ref.go
generated
vendored
53
vendor/k8s.io/client-go/tools/reference/ref.go
generated
vendored
@@ -19,8 +19,6 @@ package reference
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
@@ -30,8 +28,7 @@ import (
|
||||
|
||||
var (
|
||||
// Errors that could be returned by GetReference.
|
||||
ErrNilObject = errors.New("can't reference a nil object")
|
||||
ErrNoSelfLink = errors.New("selfLink was empty, can't make reference")
|
||||
ErrNilObject = errors.New("can't reference a nil object")
|
||||
)
|
||||
|
||||
// GetReference returns an ObjectReference which refers to the given
|
||||
@@ -47,20 +44,6 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
|
||||
// if the object referenced is actually persisted, we can just get kind from meta
|
||||
// if we are building an object reference to something not yet persisted, we should fallback to scheme
|
||||
kind := gvk.Kind
|
||||
if len(kind) == 0 {
|
||||
// TODO: this is wrong
|
||||
gvks, _, err := scheme.ObjectKinds(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kind = gvks[0].Kind
|
||||
}
|
||||
|
||||
// An object that implements only List has enough metadata to build a reference
|
||||
var listMeta metav1.Common
|
||||
objectMeta, err := meta.Accessor(obj)
|
||||
@@ -73,29 +56,29 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
|
||||
listMeta = objectMeta
|
||||
}
|
||||
|
||||
// if the object referenced is actually persisted, we can also get version from meta
|
||||
version := gvk.GroupVersion().String()
|
||||
if len(version) == 0 {
|
||||
selfLink := listMeta.GetSelfLink()
|
||||
if len(selfLink) == 0 {
|
||||
return nil, ErrNoSelfLink
|
||||
}
|
||||
selfLinkUrl, err := url.Parse(selfLink)
|
||||
gvk := obj.GetObjectKind().GroupVersionKind()
|
||||
|
||||
// If object meta doesn't contain data about kind and/or version,
|
||||
// we are falling back to scheme.
|
||||
//
|
||||
// TODO: This doesn't work for CRDs, which are not registered in scheme.
|
||||
if gvk.Empty() {
|
||||
gvks, _, err := scheme.ObjectKinds(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// example paths: /<prefix>/<version>/*
|
||||
parts := strings.Split(selfLinkUrl.Path, "/")
|
||||
if len(parts) < 4 {
|
||||
return nil, fmt.Errorf("unexpected self link format: '%v'; got version '%v'", selfLink, version)
|
||||
}
|
||||
if parts[1] == "api" {
|
||||
version = parts[2]
|
||||
} else {
|
||||
version = parts[2] + "/" + parts[3]
|
||||
if len(gvks) == 0 || gvks[0].Empty() {
|
||||
return nil, fmt.Errorf("unexpected gvks registered for object %T: %v", obj, gvks)
|
||||
}
|
||||
// TODO: The same object can be registered for multiple group versions
|
||||
// (although in practise this doesn't seem to be used).
|
||||
// In such case, the version set may not be correct.
|
||||
gvk = gvks[0]
|
||||
}
|
||||
|
||||
kind := gvk.Kind
|
||||
version := gvk.GroupVersion().String()
|
||||
|
||||
// only has list metadata
|
||||
if objectMeta == nil {
|
||||
return &v1.ObjectReference{
|
||||
|
Reference in New Issue
Block a user