update kube and vendor dependencies

With kubernetes 1.18 release of client-go, signatures on methods in
generated clientsets, dynamic, metadata, and scale clients have been
modified to accept context.Context as a first argument.
Signatures of Create, Update, and Patch methods have been updated
to accept CreateOptions, UpdateOptions and PatchOptions respectively.
Signatures of Delete and DeleteCollection methods now accept
DeleteOptions by value instead of by reference.
These changes are now accommodated with this PR and client-go
and dependencies are updated to v1.18.0

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal
2020-05-03 21:51:04 +05:30
parent d6be7e120d
commit b72230379f
1008 changed files with 20764 additions and 82152 deletions

View File

@@ -26,7 +26,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)
// Config contains all the settings for a Controller.
// This file implements a low-level controller that is used in
// sharedIndexInformer, which is an implementation of
// SharedIndexInformer. Such informers, in turn, are key components
// in the high level controllers that form the backbone of the
// Kubernetes control plane. Look at those for examples, or the
// example in
// https://github.com/kubernetes/client-go/tree/master/examples/workqueue
// .
// Config contains all the settings for one of these low-level controllers.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
@@ -36,30 +45,29 @@ type Config struct {
// Something that can list and watch your objects.
ListerWatcher
// Something that can process your objects.
// Something that can process a popped Deltas.
Process ProcessFunc
// The type of your objects.
// ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
ObjectType runtime.Object
// Reprocess everything at least this often.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
// FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration
// ShouldResync, if specified, is invoked when the controller's reflector determines the next
// periodic sync should occur. If this returns true, it means the reflector should proceed with
// the resync.
// ShouldResync is periodically used by the reflector to determine
// whether to Resync the Queue. If ShouldResync is `nil` or
// returns true, it means the reflector should proceed with the
// resync.
ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter.
// question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
RetryOnError bool
}
@@ -71,7 +79,7 @@ type ShouldResyncFunc func() bool
// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework.
// `*controller` implements Controller
type controller struct {
config Config
reflector *Reflector
@@ -79,10 +87,22 @@ type controller struct {
clock clock.Clock
}
// Controller is a generic controller framework.
// Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer.
type Controller interface {
// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue
HasSynced() bool
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string
}
@@ -95,7 +115,7 @@ func New(c *Config) Controller {
return ctlr
}
// Run begins processing items, and will continue until a value is sent down stopCh.
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
@@ -344,7 +364,10 @@ func newInformer(
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
@@ -357,7 +380,7 @@ func newInformer(
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err

View File

@@ -26,15 +26,16 @@ import (
"k8s.io/klog"
)
// NewDeltaFIFO returns a Store which can be used process changes to items.
// NewDeltaFIFO returns a Queue which can be used to process changes to items.
//
// keyFunc is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
// keyFunc is used to figure out what key an object should have. (It is
// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
// around deleted objects and queue state).
//
// 'knownObjects' may be supplied to modify the behavior of Delete,
// Replace, and Resync. It may be nil if you do not need those
// modifications.
//
// 'keyLister' is expected to return a list of keys that the consumer of
// this queue "knows about". It is used to decide which items are missing
// when Replace() is called; 'Deleted' deltas are produced for these items.
// It may be nil if you don't need to detect all deletions.
// TODO: consider merging keyLister with this object, tracking a list of
// "known" keys when Pop() is called. Have to think about how that
// affects error retrying.
@@ -56,18 +57,79 @@ import (
// and internal tests.
//
// Also see the comment on DeltaFIFO.
//
// Warning: This constructs a DeltaFIFO that does not differentiate between
// events caused by a call to Replace (e.g., from a relist, which may
// contain object updates), and synthetic events caused by a periodic resync
// (which just emit the existing object). See https://issue.k8s.io/86015 for details.
//
// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
// instead to receive a `Replaced` event depending on the type.
//
// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
// optional.
type DeltaFIFOOptions struct {
// KeyFunction is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with additional
// handling around deleted objects and queue state).
// Optional, the default is MetaNamespaceKeyFunc.
KeyFunction KeyFunc
// KnownObjects is expected to return a list of keys that the consumer of
// this queue "knows about". It is used to decide which items are missing
// when Replace() is called; 'Deleted' deltas are produced for the missing items.
// KnownObjects may be nil if you can tolerate missing deletions on Replace().
KnownObjects KeyListerGetter
// EmitDeltaTypeReplaced indicates that the queue consumer
// understands the Replaced DeltaType. Before the `Replaced` event type was
// added, calls to Replace() were handled the same as Sync(). For
// backwards-compatibility purposes, this is false by default.
// When true, `Replaced` events will be sent for items passed to a Replace() call.
// When false, `Sync` events will be sent instead.
EmitDeltaTypeReplaced bool
}
// NewDeltaFIFOWithOptions returns a Store which can be used process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
knownObjects: knownObjects,
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock
return f
}
// DeltaFIFO is like FIFO, but allows you to process deletes.
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
// accumulator associated with a given object's key is not that object
// but rather a Deltas, which is a slice of Delta values for that
// object. Applying an object to a Deltas means to append a Delta
// except when the potentially appended Delta is a Deleted and the
// Deltas already ends with a Deleted. In that case the Deltas does
// not grow, although the terminal Deleted will be replaced by the new
// Deleted if the older Deleted's object is a
// DeletedFinalStateUnknown.
//
// The other difference is that DeltaFIFO has an additional way that
// an object can be applied to an accumulator, called Sync.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
@@ -77,22 +139,22 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
// * You want to process every object change (delta) at most once.
// * When you process an object, you want to see everything
// that's happened to it since you last processed it.
// * You want to process the deletion of objects.
// * You want to process the deletion of some of the objects.
// * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
// question are called "known objects" and this set of objects
// modifies the behavior of the Delete, Replace, and Resync methods
// (each in a different way).
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
@@ -114,9 +176,8 @@ type DeltaFIFO struct {
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter
// Indication the queue is closed.
@@ -124,6 +185,10 @@ type DeltaFIFO struct {
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
}
var (
@@ -185,9 +250,11 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
return f.queueActionLocked(Updated, obj)
}
// Delete is just like Add, but makes an Deleted Delta. If the item does not
// already exist, it will be ignored. (It may have already been deleted by a
// Replace (re-list), for example.
// Delete is just like Add, but makes a Deleted Delta. If the given
// object does not already exist, it will be ignored. (It may have
// already been deleted by a Replace (re-list), for example.) In this
// method `f.knownObjects`, if not nil, provides (via GetByKey)
// _additional_ objects that are considered to already exist.
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
@@ -313,6 +380,9 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
@@ -430,22 +500,34 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
}
}
// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
// Replace atomically does two things: (1) it adds the given objects
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K. If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// keep backwards compat for old clients
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(Sync, item); err != nil {
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
@@ -507,7 +589,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync will send a sync event for each item
// Resync adds, with a Sync type of Delta, every object listed by
// `f.knownObjects` whose key is not already queued for processing.
// If `f.knownObjects` is `nil` then Resync does nothing.
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
@@ -577,10 +661,14 @@ const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
// Replaced is emitted when we encountered watch errors and had to do a
// relist. We don't know if the replaced object has changed.
//
// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
// as well. Hence, Replaced is only emitted when the option
// EmitDeltaTypeReplaced is true.
Replaced DeltaType = "Replaced"
// Sync is for synthetic events during a periodic resync.
Sync DeltaType = "Sync"
)

View File

@@ -194,9 +194,9 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
return nil
}
// Resync will touch all objects to put them into the processing queue
// Resync is a no-op for one of these
func (c *ExpirationCache) Resync() error {
return c.cacheStorage.Resync()
return nil
}
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy

View File

@@ -24,7 +24,7 @@ import (
)
// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the element popped from the queue.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error
// ErrRequeue may be returned by a PopProcessFunc to safely requeue
@@ -44,26 +44,38 @@ func (e ErrRequeue) Error() string {
return e.Err.Error()
}
// Queue is exactly like a Store, but has a Pop() method too.
// Queue extends Store with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface {
Store
// Pop blocks until it has something to process.
// It returns the object that was process and the result of processing.
// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
// should be requeued before releasing the lock on the queue.
// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
// In the former case Pop atomically picks one key to process,
// removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
// was processed and the result of processing. The PopProcessFunc
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent adds a value previously
// returned by Pop back into the queue as long
// as nothing else (presumably more recent)
// has since been added.
// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
AddIfNotPresent(interface{}) error
// HasSynced returns true if the first batch of items has been popped
// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
HasSynced() bool
// Close queue
// Close the queue
Close()
}
@@ -79,11 +91,16 @@ func Pop(queue Queue) interface{} {
return result
}
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
// FIFO order processing. If multiple adds/updates of a single item happen while
// an item is in the queue before it has been processed, it will only be
// processed once, and when it is processed, the most recent version will be
// processed. This can't be done with a channel.
// FIFO is a Queue in which (a) each accumulator is simply the most
// recently provided object and (b) the collection of keys to process
// is a FIFO. The accumulators all start out empty, and deleting an
// object from its accumulator empties the accumulator. The Resync
// operation is a no-op.
//
// Thus: if multiple adds/updates of a single object happen while that
// object's key is in the queue before it has been processed then it
// will only be processed once, and when it is processed the most
// recent version will be processed. This can't be done with a channel
//
// FIFO solves this use case:
// * You want to process every object (exactly) once.
@@ -94,7 +111,7 @@ func Pop(queue Queue) interface{} {
type FIFO struct {
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in the queue and vice versa.
// We depend on the property that every key in `items` is also in `queue`
items map[string]interface{}
queue []string
@@ -326,7 +343,8 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync will touch all objects to put them into the processing queue
// Resync will ensure that every object in the Store has its key in the queue.
// This should be a no-op, because that property is maintained by all operations.
func (f *FIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()

View File

@@ -23,12 +23,15 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)
// Indexer is a storage interface that lets you list objects using multiple indexing functions.
// There are three kinds of strings here.
// One is a storage key, as defined in the Store interface.
// Another kind is a name of an index.
// The third kind of string is an "indexed value", which is produced by an
// IndexFunc and can be a field value or any other string computed from the object.
// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after
// Delete).
//
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values

View File

@@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/pager"
)
// Lister is any object that knows how to perform an initial list.
@@ -85,7 +84,7 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do().
Do(context.TODO()).
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
@@ -95,16 +94,15 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
Watch(context.TODO())
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
if !lw.DisableChunking {
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
}
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
return lw.ListFunc(options)
}

View File

@@ -36,9 +36,12 @@ func init() {
mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
}
// MutationDetector is able to monitor if the object be modified outside.
// MutationDetector is able to monitor objects for mutation within a limited window of time
type MutationDetector interface {
// AddObject adds the given object to the set being monitored for a while from now
AddObject(obj interface{})
// Run starts the monitoring and does not return until the monitoring is stopped.
Run(stopCh <-chan struct{})
}
@@ -65,7 +68,13 @@ type defaultCacheMutationDetector struct {
name string
period time.Duration
lock sync.Mutex
// compareLock ensures only a single call to CompareObjects runs at a time
compareObjectsLock sync.Mutex
// addLock guards addedObjs between AddObject and CompareObjects
addedObjsLock sync.Mutex
addedObjs []cacheObj
cachedObjs []cacheObj
retainDuration time.Duration
@@ -115,15 +124,22 @@ func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
if obj, ok := obj.(runtime.Object); ok {
copiedObj := obj.DeepCopyObject()
d.lock.Lock()
defer d.lock.Unlock()
d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj})
d.addedObjsLock.Lock()
defer d.addedObjsLock.Unlock()
d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
}
}
func (d *defaultCacheMutationDetector) CompareObjects() {
d.lock.Lock()
defer d.lock.Unlock()
d.compareObjectsLock.Lock()
defer d.compareObjectsLock.Unlock()
// move addedObjs into cachedObjs under lock
// this keeps the critical section small to avoid blocking AddObject while we compare cachedObjs
d.addedObjsLock.Lock()
d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
d.addedObjs = nil
d.addedObjsLock.Unlock()
altered := false
for i, obj := range d.cachedObjs {

View File

@@ -26,7 +26,7 @@ import (
"sync"
"time"
apierrs "k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -55,7 +55,10 @@ type Reflector struct {
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
@@ -63,13 +66,18 @@ type Reflector struct {
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call.
paginatedResult bool
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
@@ -80,7 +88,12 @@ type Reflector struct {
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
}
@@ -98,25 +111,33 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
return indexer, reflector
}
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
// NewReflector creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
}
r.setExpectedType(expectedType)
return r
@@ -147,15 +168,17 @@ func (r *Reflector) setExpectedType(expectedType interface{}) {
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/"}
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
var (
@@ -194,6 +217,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
@@ -208,11 +232,30 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager.PageSize = 0
}
list, err = pager.List(context.Background(), options)
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true)
// Retry immediately if the resource version used to list is expired.
@@ -220,7 +263,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// continuation pages, but the pager might not be enabled, or the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
// to recover and ensure the reflector makes forward progress.
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
@@ -234,6 +277,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionExpired(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
@@ -310,7 +368,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
switch {
case isExpiredError(err):
r.setIsLastSyncResourceVersionExpired(true)
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
@@ -334,8 +394,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != errorStopRequested {
switch {
case isExpiredError(err):
r.setIsLastSyncResourceVersionExpired(true)
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
@@ -375,7 +437,7 @@ loop:
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
@@ -479,9 +541,9 @@ func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
}
func isExpiredError(err error) bool {
// In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and
// apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
// and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone
// In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
// apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
// and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrs.IsResourceExpired(err) || apierrs.IsGone(err)
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}

View File

@@ -21,11 +21,11 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/utils/buffer"
"k8s.io/klog"
@@ -46,15 +46,6 @@ import (
// An object state is either "absent" or present with a
// ResourceVersion and other appropriate content.
//
// A SharedInformer gets object states from apiservers using a
// sequence of LIST and WATCH operations. Through this sequence the
// apiservers provide a sequence of "collection states" to the
// informer, where each collection state defines the state of every
// object of the collection. No promise --- beyond what is implied by
// other remarks here --- is made about how one informer's sequence of
// collection states relates to a different informer's sequence of
// collection states.
//
// A SharedInformer maintains a local cache, exposed by GetStore() and
// by GetIndexer() in the case of an indexed informer, of the state of
// each relevant object. This cache is eventually consistent with the
@@ -67,10 +58,17 @@ import (
// To be formally complete, we say that the absent state meets any
// restriction by label selector or field selector.
//
// For a given informer and relevant object ID X, the sequence of
// states that appears in the informer's cache is a subsequence of the
// states authoritatively associated with X. That is, some states
// might never appear in the cache but ordering among the appearing
// states is correct. Note, however, that there is no promise about
// ordering between states seen for different objects.
//
// The local cache starts out empty, and gets populated and updated
// during `Run()`.
//
// As a simple example, if a collection of objects is henceforeth
// As a simple example, if a collection of objects is henceforth
// unchanging, a SharedInformer is created that links to that
// collection, and that SharedInformer is `Run()` then that
// SharedInformer's cache eventually holds an exact copy of that
@@ -91,6 +89,10 @@ import (
// a given object, and `SplitMetaNamespaceKey(key)` to split a key
// into its constituent parts.
//
// Every query against the local cache is answered entirely from one
// snapshot of the cache's state. Thus, the result of a `List` call
// will not contain two entries with the same namespace and name.
//
// A client is identified here by a ResourceEventHandler. For every
// update to the SharedInformer's local cache and for every client
// added before `Run()`, eventually either the SharedInformer is
@@ -106,7 +108,16 @@ import (
// and index updates happen before such a prescribed notification.
// For a given SharedInformer and client, the notifications are
// delivered sequentially. For a given SharedInformer, client, and
// object ID, the notifications are delivered in order.
// object ID, the notifications are delivered in order. Because
// `ObjectMeta.UID` has no role in identifying objects, it is possible
// that when (1) object O1 with ID (e.g. namespace and name) X and
// `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
// and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
// created the informer's clients are not notified of (1) and (2) but
// rather are notified only of an update from O1 to O2. Clients that
// need to detect such cases might do so by comparing the `ObjectMeta.UID`
// field of the old and the new object in the code that handles update
// notifications (i.e. `OnUpdate` method of ResourceEventHandler).
//
// A client must process each notification promptly; a SharedInformer
// is not engineered to deal well with a large backlog of
@@ -114,11 +125,6 @@ import (
// to something else, for example through a
// `client-go/util/workqueue`.
//
// Each query to an informer's local cache --- whether a single-object
// lookup, a list operation, or a use of one of its indices --- is
// answered entirely from one of the collection states received by
// that informer.
//
// A delete notification exposes the last locally known non-absent
// state, except that its ResourceVersion is replaced with a
// ResourceVersion in which the object is actually absent.
@@ -128,14 +134,23 @@ type SharedInformer interface {
// between different handlers.
AddEventHandler(handler ResourceEventHandler)
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer using the specified resync period. The resync
// operation consists of delivering to the handler a create
// notification for every object in the informer's local cache; it
// does not add any interactions with the authoritative storage.
// shared informer with the requested resync period; zero means
// this handler does not care about resyncs. The resync operation
// consists of delivering to the handler an update notification
// for every object in the informer's local cache; it does not add
// any interactions with the authoritative storage. Some
// informers do no resyncs at all, not even for handlers added
// with a non-zero resyncPeriod. For an informer that does
// resyncs, and for each handler that requests resyncs, that
// informer develops a nominal resync period that is no shorter
// than the requested period but may be longer. The actual time
// between any two resyncs may be longer than the nominal period
// because the implementation takes time to do work and there may
// be competing load and scheduling noise.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the informer's local cache as a Store.
GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer
// GetController is deprecated, it does nothing useful
GetController() Controller
// Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed.
@@ -159,21 +174,32 @@ type SharedIndexInformer interface {
}
// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
// The created informer will not do resyncs if the given
// defaultEventHandlerResyncPeriod is zero. Otherwise: for each
// handler that with a non-zero requested resync period, whether added
// before or after the informer starts, the nominal resync period is
// the requested resync period rounded up to a multiple of the
// informer's resync checking period. Such an informer's resync
// checking period is established when the informer starts running,
// and is the maximum of (a) the minimum of the resync periods
// requested before the informer starts and the
// defaultEventHandlerResyncPeriod given here and (b) the constant
// `minimumResyncPeriod` defined in this file.
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
@@ -228,6 +254,19 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
return true
}
// `*sharedIndexInformer` implements SharedIndexInformer and has three
// main components. One is an indexed local cache, `indexer Indexer`.
// The second main component is a Controller that pulls
// objects/notifications using the ListerWatcher and pushes them into
// a DeltaFIFO --- whose knownObjects is the informer's local cache
// --- while concurrently Popping Deltas values from that fifo and
// processing them with `sharedIndexInformer::HandleDeltas`. Each
// invocation of HandleDeltas, which is done with the fifo's lock
// held, processes each Delta in turn. For each Delta this both
// updates the local cache and stuffs the relevant notification into
// the sharedProcessor. The third main component is that
// sharedProcessor, which is responsible for relaying those
// notifications to each of the informer's clients.
type sharedIndexInformer struct {
indexer Indexer
controller Controller
@@ -235,9 +274,13 @@ type sharedIndexInformer struct {
processor *sharedProcessor
cacheMutationDetector MutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType runtime.Object
// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
@@ -293,7 +336,10 @@ type deleteNotification struct {
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
@@ -452,19 +498,33 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
@@ -476,6 +536,12 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
return nil
}
// sharedProcessor has a collection of processorListener and can
// distribute a notification object to its listeners. There are two
// kinds of distribute operations. The sync distributions go to a
// subset of the listeners that (a) is recomputed in the occasional
// calls to shouldResync and (b) every listener is initially put in.
// The non-sync distributions go to every listener.
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
@@ -567,6 +633,17 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
}
}
// processorListener relays notifications from a sharedProcessor to
// one ResourceEventHandler --- using two goroutines, two unbuffered
// channels, and an unbounded ring buffer. The `add(notification)`
// function sends the given notification to `addCh`. One goroutine
// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
// using storage in the ring buffer while `nextCh` is not keeping up.
// Another goroutine runs `run()`, which receives notifications from
// `nextCh` and synchronously invokes the appropriate handler method.
//
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
@@ -580,11 +657,22 @@ type processorListener struct {
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
// requestedResyncPeriod is how frequently the listener wants a
// full resync from the shared informer, but modified by two
// adjustments. One is imposing a lower bound,
// `minimumResyncPeriod`. The other is another lower bound, the
// sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
// in AddEventHandlerWithResyncPeriod invocations made after the
// sharedProcessor starts and (b) only if the informer does
// resyncs at all.
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
// resyncPeriod is the threshold that will be used in the logic
// for this listener. This value differs from
// requestedResyncPeriod only when the sharedIndexInformer does
// not do resyncs, in which case the value here is zero. The
// actual time between resyncs depends on when the
// sharedProcessor's `shouldResync` function is invoked and when
// the sharedIndexInformer processes `Sync` type Delta objects.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
@@ -648,29 +736,21 @@ func (p *processorListener) run() {
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,

View File

@@ -23,27 +23,50 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
)
// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
// Store is a generic object storage and processing interface. A
// Store holds a map from string keys to accumulators, and has
// operations to add, update, and delete a given object to/from the
// accumulator currently associated with a given key. A Store also
// knows how to extract the key from a given object, so many operations
// are given only the object.
//
// Store makes no assumptions about stored object identity; it is the responsibility
// of a Store implementation to provide a mechanism to correctly key objects and to
// define the contract for obtaining objects by some arbitrary key type.
// In the simplest Store implementations each accumulator is simply
// the last given object, or empty after Delete, and thus the Store's
// behavior is simple storage.
//
// Reflector knows how to watch a server and update a Store. This
// package provides a variety of implementations of Store.
type Store interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{}
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
@@ -106,9 +129,8 @@ func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
return "", "", fmt.Errorf("unexpected key format: %q", key)
}
// cache responsibilities are limited to:
// 1. Computing keys for objects via keyFunc
// 2. Invoking methods of a ThreadSafeStorage interface
// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
@@ -222,9 +244,9 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync touches all items in the store to force processing
// Resync is meaningless for one of these
func (c *cache) Resync() error {
return c.cacheStorage.Resync()
return nil
}
// NewStore returns a Store implemented simply with a map and a lock.

View File

@@ -23,7 +23,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)
// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// ThreadSafeStore is an interface that allows concurrent indexed
// access to a storage backend. It is like Indexer but does not
// (necessarily) know how to extract the Store key from a given
// object.
//
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
@@ -51,6 +55,7 @@ type ThreadSafeStore interface {
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
// Resync is a no-op and is deprecated
Resync() error
}
@@ -131,8 +136,8 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
}
}
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
@@ -142,37 +147,37 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
indexKeys, err := indexFunc(obj)
indexedValues, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
var returnKeySet sets.String
if len(indexKeys) == 1 {
var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
returnKeySet = index[indexKeys[0]]
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
returnKeySet = sets.String{}
for _, indexKey := range indexKeys {
for key := range index[indexKey] {
returnKeySet.Insert(key)
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
list := make([]interface{}, 0, returnKeySet.Len())
for absoluteKey := range returnKeySet {
list = append(list, c.items[absoluteKey])
list := make([]interface{}, 0, storeKeySet.Len())
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
}
return list, nil
}
// ByIndex returns a list of items that match an exact value on the index function
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
@@ -183,7 +188,7 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
index := c.indices[indexName]
set := index[indexKey]
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
@@ -192,9 +197,9 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
return list, nil
}
// IndexKeys returns a list of keys that match on the index function.
// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
// IndexKeys is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
@@ -205,7 +210,7 @@ func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error)
index := c.indices[indexName]
set := index[indexKey]
set := index[indexedValue]
return set.List(), nil
}

View File

@@ -70,6 +70,9 @@ type Cluster struct {
LocationOfOrigin string
// Server is the address of the kubernetes cluster (https://hostname:port).
Server string `json:"server"`
// TLSServerName is used to check server certificate. If TLSServerName is empty, the hostname used to contact the server is used.
// +optional
TLSServerName string `json:"tls-server-name,omitempty"`
// InsecureSkipTLSVerify skips the validity check for the server's certificate. This will make your HTTPS connections insecure.
// +optional
InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify,omitempty"`

View File

@@ -31,6 +31,9 @@ func Convert_Slice_v1_NamedCluster_To_Map_string_To_Pointer_api_Cluster(in *[]Na
if err := Convert_v1_Cluster_To_api_Cluster(&curr.Cluster, newCluster, s); err != nil {
return err
}
if *out == nil {
*out = make(map[string]*api.Cluster)
}
if (*out)[curr.Name] == nil {
(*out)[curr.Name] = newCluster
} else {
@@ -65,6 +68,9 @@ func Convert_Slice_v1_NamedAuthInfo_To_Map_string_To_Pointer_api_AuthInfo(in *[]
if err := Convert_v1_AuthInfo_To_api_AuthInfo(&curr.AuthInfo, newAuthInfo, s); err != nil {
return err
}
if *out == nil {
*out = make(map[string]*api.AuthInfo)
}
if (*out)[curr.Name] == nil {
(*out)[curr.Name] = newAuthInfo
} else {
@@ -99,6 +105,9 @@ func Convert_Slice_v1_NamedContext_To_Map_string_To_Pointer_api_Context(in *[]Na
if err := Convert_v1_Context_To_api_Context(&curr.Context, newContext, s); err != nil {
return err
}
if *out == nil {
*out = make(map[string]*api.Context)
}
if (*out)[curr.Name] == nil {
(*out)[curr.Name] = newContext
} else {
@@ -133,6 +142,9 @@ func Convert_Slice_v1_NamedExtension_To_Map_string_To_runtime_Object(in *[]Named
if err := runtime.Convert_runtime_RawExtension_To_runtime_Object(&curr.Extension, &newExtension, s); err != nil {
return err
}
if *out == nil {
*out = make(map[string]runtime.Object)
}
if (*out)[curr.Name] == nil {
(*out)[curr.Name] = newExtension
} else {

View File

@@ -63,6 +63,9 @@ type Preferences struct {
type Cluster struct {
// Server is the address of the kubernetes cluster (https://hostname:port).
Server string `json:"server"`
// TLSServerName is used to check server certificate. If TLSServerName is empty, the hostname used to contact the server is used.
// +optional
TLSServerName string `json:"tls-server-name,omitempty"`
// InsecureSkipTLSVerify skips the validity check for the server's certificate. This will make your HTTPS connections insecure.
// +optional
InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify,omitempty"`

View File

@@ -233,6 +233,7 @@ func Convert_api_AuthProviderConfig_To_v1_AuthProviderConfig(in *api.AuthProvide
func autoConvert_v1_Cluster_To_api_Cluster(in *Cluster, out *api.Cluster, s conversion.Scope) error {
out.Server = in.Server
out.TLSServerName = in.TLSServerName
out.InsecureSkipTLSVerify = in.InsecureSkipTLSVerify
out.CertificateAuthority = in.CertificateAuthority
out.CertificateAuthorityData = *(*[]byte)(unsafe.Pointer(&in.CertificateAuthorityData))
@@ -250,6 +251,7 @@ func Convert_v1_Cluster_To_api_Cluster(in *Cluster, out *api.Cluster, s conversi
func autoConvert_api_Cluster_To_v1_Cluster(in *api.Cluster, out *Cluster, s conversion.Scope) error {
// INFO: in.LocationOfOrigin opted out of conversion generation
out.Server = in.Server
out.TLSServerName = in.TLSServerName
out.InsecureSkipTLSVerify = in.InsecureSkipTLSVerify
out.CertificateAuthority = in.CertificateAuthority
out.CertificateAuthorityData = *(*[]byte)(unsafe.Pointer(&in.CertificateAuthorityData))

View File

@@ -35,7 +35,7 @@ import (
var (
// ClusterDefaults has the same behavior as the old EnvVar and DefaultCluster fields
// DEPRECATED will be replaced
ClusterDefaults = clientcmdapi.Cluster{Server: getDefaultServer()}
ClusterDefaults = clientcmdapi.Cluster{Server: os.Getenv("KUBERNETES_MASTER")}
// DefaultClientConfig represents the legacy behavior of this package for defaulting
// DEPRECATED will be replace
DefaultClientConfig = DirectClientConfig{*clientcmdapi.NewConfig(), "", &ConfigOverrides{
@@ -43,15 +43,6 @@ var (
}, nil, NewDefaultClientConfigLoadingRules(), promptedCredentials{}}
)
// getDefaultServer returns a default setting for DefaultClientConfig
// DEPRECATED
func getDefaultServer() string {
if server := os.Getenv("KUBERNETES_MASTER"); len(server) > 0 {
return server
}
return "http://localhost:8080"
}
// ClientConfig is used to make it easy to get an api server client
type ClientConfig interface {
// RawConfig returns the merged result of all overrides
@@ -210,6 +201,7 @@ func getServerIdentificationPartialConfig(configAuthInfo clientcmdapi.AuthInfo,
configClientConfig.CAFile = configClusterInfo.CertificateAuthority
configClientConfig.CAData = configClusterInfo.CertificateAuthorityData
configClientConfig.Insecure = configClusterInfo.InsecureSkipTLSVerify
configClientConfig.ServerName = configClusterInfo.TLSServerName
mergo.MergeWithOverwrite(mergedConfig, configClientConfig)
return mergedConfig, nil
@@ -460,6 +452,14 @@ func (config *DirectClientConfig) getCluster() (clientcmdapi.Cluster, error) {
mergedClusterInfo.CertificateAuthorityData = config.overrides.ClusterInfo.CertificateAuthorityData
}
// if the --tls-server-name has been set in overrides, use that value.
// if the --server has been set in overrides, then use the value of --tls-server-name specified on the CLI too. This gives the property
// that setting a --server will effectively clear the KUBECONFIG value of tls-server-name if it is specified on the command line which is
// usually correct.
if config.overrides.ClusterInfo.TLSServerName != "" || config.overrides.ClusterInfo.Server != "" {
mergedClusterInfo.TLSServerName = config.overrides.ClusterInfo.TLSServerName
}
return *mergedClusterInfo, nil
}

View File

@@ -71,6 +71,7 @@ type ClusterOverrideFlags struct {
APIVersion FlagInfo
CertificateAuthority FlagInfo
InsecureSkipTLSVerify FlagInfo
TLSServerName FlagInfo
}
// FlagInfo contains information about how to register a flag. This struct is useful if you want to provide a way for an extender to
@@ -145,6 +146,7 @@ const (
FlagContext = "context"
FlagNamespace = "namespace"
FlagAPIServer = "server"
FlagTLSServerName = "tls-server-name"
FlagInsecure = "insecure-skip-tls-verify"
FlagCertFile = "client-certificate"
FlagKeyFile = "client-key"
@@ -189,6 +191,7 @@ func RecommendedClusterOverrideFlags(prefix string) ClusterOverrideFlags {
APIServer: FlagInfo{prefix + FlagAPIServer, "", "", "The address and port of the Kubernetes API server"},
CertificateAuthority: FlagInfo{prefix + FlagCAFile, "", "", "Path to a cert file for the certificate authority"},
InsecureSkipTLSVerify: FlagInfo{prefix + FlagInsecure, "", "false", "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure"},
TLSServerName: FlagInfo{prefix + FlagTLSServerName, "", "", "If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used."},
}
}
@@ -226,6 +229,7 @@ func BindClusterFlags(clusterInfo *clientcmdapi.Cluster, flags *pflag.FlagSet, f
flagNames.APIServer.BindStringFlag(flags, &clusterInfo.Server)
flagNames.CertificateAuthority.BindStringFlag(flags, &clusterInfo.CertificateAuthority)
flagNames.InsecureSkipTLSVerify.BindBoolFlag(flags, &clusterInfo.InsecureSkipTLSVerify)
flagNames.TLSServerName.BindStringFlag(flags, &clusterInfo.TLSServerName)
}
// BindFlags is a convenience method to bind the specified flags to their associated variables

View File

@@ -30,7 +30,7 @@ import (
var (
ErrNoContext = errors.New("no context chosen")
ErrEmptyConfig = errors.New("no configuration has been provided")
ErrEmptyConfig = errors.New("no configuration has been provided, try setting KUBERNETES_MASTER environment variable")
// message is for consistency with old behavior
ErrEmptyCluster = errors.New("cluster has no server defined")
)
@@ -86,11 +86,41 @@ func (e errConfigurationInvalid) Error() string {
return fmt.Sprintf("invalid configuration: %v", utilerrors.NewAggregate(e).Error())
}
// Errors implements the AggregateError interface
// Errors implements the utilerrors.Aggregate interface
func (e errConfigurationInvalid) Errors() []error {
return e
}
// Is implements the utilerrors.Aggregate interface
func (e errConfigurationInvalid) Is(target error) bool {
return e.visit(func(err error) bool {
return errors.Is(err, target)
})
}
func (e errConfigurationInvalid) visit(f func(err error) bool) bool {
for _, err := range e {
switch err := err.(type) {
case errConfigurationInvalid:
if match := err.visit(f); match {
return match
}
case utilerrors.Aggregate:
for _, nestedErr := range err.Errors() {
if match := f(nestedErr); match {
return match
}
}
default:
if match := f(err); match {
return match
}
}
}
return false
}
// IsConfigurationInvalid returns true if the provided error indicates the configuration is invalid.
func IsConfigurationInvalid(err error) bool {
switch err.(type) {

View File

@@ -241,7 +241,7 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew()
succeeded = le.tryAcquireOrRenew(ctx)
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
@@ -263,18 +263,7 @@ func (le *LeaderElector) renew(ctx context.Context) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()
select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
le.maybeReportTransition()
@@ -303,7 +292,7 @@ func (le *LeaderElector) release() bool {
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
}
if err := le.config.Lock.Update(leaderElectionRecord); err != nil {
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
return false
}
@@ -315,7 +304,7 @@ func (le *LeaderElector) release() bool {
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew() bool {
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
@@ -325,13 +314,13 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
@@ -363,7 +352,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package resourcelock
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -41,10 +42,10 @@ type ConfigMapLock struct {
}
// Get returns the election record from a ConfigMap Annotation
func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
func (cml *ConfigMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
var err error
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{})
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@@ -61,12 +62,12 @@ func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a LeaderElectionRecord annotation
func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(&v1.ConfigMap{
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cml.ConfigMapMeta.Name,
Namespace: cml.ConfigMapMeta.Namespace,
@@ -74,12 +75,12 @@ func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
})
}, metav1.CreateOptions{})
return err
}
// Update will update an existing annotation on a given resource.
func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if cml.cm == nil {
return errors.New("configmap not initialized, call get or create first")
}
@@ -88,7 +89,7 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
return err
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(cml.cm)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
return err
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package resourcelock
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -36,10 +37,10 @@ type EndpointsLock struct {
}
// Get returns the election record from a Endpoints Annotation
func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
func (el *EndpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
var err error
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{})
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@@ -56,12 +57,12 @@ func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a LeaderElectionRecord annotation
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
func (el *EndpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: el.EndpointsMeta.Name,
Namespace: el.EndpointsMeta.Namespace,
@@ -69,12 +70,12 @@ func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
})
}, metav1.CreateOptions{})
return err
}
// Update will update and existing annotation on a given resource.
func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if el.e == nil {
return errors.New("endpoint not initialized, call get or create first")
}
@@ -86,7 +87,7 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
el.e.Annotations = make(map[string]string)
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(el.e)
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
return err
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package resourcelock
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -73,13 +74,13 @@ type ResourceLockConfig struct {
// by the leaderelection code.
type Interface interface {
// Get returns the LeaderElectionRecord
Get() (*LeaderElectionRecord, []byte, error)
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
// Create attempts to create a LeaderElectionRecord
Create(ler LeaderElectionRecord) error
Create(ctx context.Context, ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord
Update(ler LeaderElectionRecord) error
Update(ctx context.Context, ler LeaderElectionRecord) error
// RecordEvent is used to record events
RecordEvent(string)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package resourcelock
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -37,9 +38,9 @@ type LeaseLock struct {
}
// Get returns the election record from a Lease spec
func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{})
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@@ -52,26 +53,26 @@ func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a Lease
func (ll *LeaseLock) Create(ler LeaderElectionRecord) error {
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(&coordinationv1.Lease{
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: ll.LeaseMeta.Name,
Namespace: ll.LeaseMeta.Namespace,
},
Spec: LeaderElectionRecordToLeaseSpec(&ler),
})
}, metav1.CreateOptions{})
return err
}
// Update will update an existing Lease spec.
func (ll *LeaseLock) Update(ler LeaderElectionRecord) error {
func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if ll.lease == nil {
return errors.New("lease not initialized, call get or create first")
}
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ll.lease)
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
return err
}

View File

@@ -18,6 +18,7 @@ package resourcelock
import (
"bytes"
"context"
"encoding/json"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -34,13 +35,13 @@ type MultiLock struct {
}
// Get returns the older election record of the lock
func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get()
func (ml *MultiLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get(ctx)
if err != nil {
return nil, nil, err
}
secondary, secondaryRaw, err := ml.Secondary.Get()
secondary, secondaryRaw, err := ml.Secondary.Get(ctx)
if err != nil {
// Lock is held by old client
if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() {
@@ -60,25 +61,25 @@ func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create both primary lock and secondary lock
func (ml *MultiLock) Create(ler LeaderElectionRecord) error {
err := ml.Primary.Create(ler)
func (ml *MultiLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
err := ml.Primary.Create(ctx, ler)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return ml.Secondary.Create(ler)
return ml.Secondary.Create(ctx, ler)
}
// Update will update and existing annotation on both two resources.
func (ml *MultiLock) Update(ler LeaderElectionRecord) error {
err := ml.Primary.Update(ler)
func (ml *MultiLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
err := ml.Primary.Update(ctx, ler)
if err != nil {
return err
}
_, _, err = ml.Secondary.Get()
_, _, err = ml.Secondary.Get(ctx)
if err != nil && apierrors.IsNotFound(err) {
return ml.Secondary.Create(ler)
return ml.Secondary.Create(ctx, ler)
}
return ml.Secondary.Update(ler)
return ml.Secondary.Update(ctx, ler)
}
// RecordEvent in leader election while adding meta-data

View File

@@ -26,6 +26,16 @@ import (
var registerMetrics sync.Once
// DurationMetric is a measurement of some amount of time.
type DurationMetric interface {
Observe(duration time.Duration)
}
// ExpiryMetric sets some time of expiry. If nil, assume not relevant.
type ExpiryMetric interface {
Set(expiry *time.Time)
}
// LatencyMetric observes client latency partitioned by verb and url.
type LatencyMetric interface {
Observe(verb string, u url.URL, latency time.Duration)
@@ -37,21 +47,57 @@ type ResultMetric interface {
}
var (
// ClientCertExpiry is the expiry time of a client certificate
ClientCertExpiry ExpiryMetric = noopExpiry{}
// ClientCertRotationAge is the age of a certificate that has just been rotated.
ClientCertRotationAge DurationMetric = noopDuration{}
// RequestLatency is the latency metric that rest clients will update.
RequestLatency LatencyMetric = noopLatency{}
// RateLimiterLatency is the client side rate limiter latency metric.
RateLimiterLatency LatencyMetric = noopLatency{}
// RequestResult is the result metric that rest clients will update.
RequestResult ResultMetric = noopResult{}
)
// RegisterOpts contains all the metrics to register. Metrics may be nil.
type RegisterOpts struct {
ClientCertExpiry ExpiryMetric
ClientCertRotationAge DurationMetric
RequestLatency LatencyMetric
RateLimiterLatency LatencyMetric
RequestResult ResultMetric
}
// Register registers metrics for the rest client to use. This can
// only be called once.
func Register(lm LatencyMetric, rm ResultMetric) {
func Register(opts RegisterOpts) {
registerMetrics.Do(func() {
RequestLatency = lm
RequestResult = rm
if opts.ClientCertExpiry != nil {
ClientCertExpiry = opts.ClientCertExpiry
}
if opts.ClientCertRotationAge != nil {
ClientCertRotationAge = opts.ClientCertRotationAge
}
if opts.RequestLatency != nil {
RequestLatency = opts.RequestLatency
}
if opts.RateLimiterLatency != nil {
RateLimiterLatency = opts.RateLimiterLatency
}
if opts.RequestResult != nil {
RequestResult = opts.RequestResult
}
})
}
type noopDuration struct{}
func (noopDuration) Observe(time.Duration) {}
type noopExpiry struct{}
func (noopExpiry) Set(*time.Time) {}
type noopLatency struct{}
func (noopLatency) Observe(string, url.URL, time.Duration) {}

View File

@@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager {
// List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead. The Limit field on options, if unset, will default to the page size.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
if options.Limit == 0 {
options.Limit = p.PageSize
}
requestedResourceVersion := options.ResourceVersion
var list *metainternalversion.List
paginatedResult := false
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, paginatedResult, ctx.Err()
default:
}
@@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
// failing when the resource versions is established by the first page request falls out of the compaction
// during the subsequent list requests).
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
return nil, err
return nil, paginatedResult, err
}
// the list expired while we were processing, fall back to a full list at
// the requested ResourceVersion.
options.Limit = 0
options.Continue = ""
options.ResourceVersion = requestedResourceVersion
return p.PageFn(ctx, options)
result, err := p.PageFn(ctx, options)
return result, paginatedResult, err
}
m, err := meta.ListAccessor(obj)
if err != nil {
return nil, fmt.Errorf("returned object must be a list: %v", err)
return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
}
// exit early and return the object we got if we haven't processed any pages
if len(m.GetContinue()) == 0 && list == nil {
return obj, nil
return obj, paginatedResult, nil
}
// initialize the list and fill its contents
@@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
list.Items = append(list.Items, obj)
return nil
}); err != nil {
return nil, err
return nil, paginatedResult, err
}
// if we have no more items, return the list
if len(m.GetContinue()) == 0 {
return list, nil
return list, paginatedResult, nil
}
// set the next loop up
@@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
// `specifying resource version is not allowed when using continue` error.
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
options.ResourceVersion = ""
// At this point, result is already paginated.
paginatedResult = true
}
}

View File

@@ -102,9 +102,6 @@ type EventRecorder interface {
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
@@ -343,10 +340,6 @@ func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, m
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

View File

@@ -19,7 +19,6 @@ package record
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -42,11 +41,8 @@ func (f *FakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageF
}
}
func (f *FakeRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}
func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
f.Eventf(object, eventtype, reason, messageFmt, args)
f.Eventf(object, eventtype, reason, messageFmt, args...)
}
// NewFakeRecorder creates new fake event recorder with event channel with