Update vendored dependencies

This commit is contained in:
Christian Huffman
2020-07-23 15:19:22 -04:00
parent 547e88e4fb
commit bda8f8c0ae
909 changed files with 119096 additions and 130549 deletions

View File

@@ -26,7 +26,6 @@ reviewers:
- pmorie
- janetkuo
- justinsb
- eparis
- soltysh
- jsafrane
- dims
@@ -38,7 +37,6 @@ reviewers:
- ingvagabund
- resouer
- jessfraz
- david-mcmahon
- mfojtik
- mqliang
- sdminonne

View File

@@ -69,6 +69,9 @@ type Config struct {
// question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
RetryOnError bool
// Called whenever the ListAndWatch drops the connection with an error.
WatchErrorHandler WatchErrorHandler
}
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
@@ -132,6 +135,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
@@ -183,9 +189,11 @@ func (c *controller) processLoop() {
}
}
// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// ResourceEventHandler can handle notifications for events that
// happen to a resource. The events are informational only, so you
// can't return an error. The handlers MUST NOT modify the objects
// received; this concerns not only the top level of structure but all
// the data structures reachable from it.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
@@ -205,7 +213,8 @@ type ResourceEventHandler interface {
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
// ResourceEventHandler. This adapter does not remove the prohibition against
// modifying the objects.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
@@ -237,6 +246,7 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
// object that stops passing the filter after an update is considered a delete.
// Like the handlers, the filter MUST NOT modify the objects it is given.
type FilteringResourceEventHandler struct {
FilterFunc func(obj interface{}) bool
Handler ResourceEventHandler

View File

@@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
"k8s.io/klog/v2"
)
// NewDeltaFIFO returns a Queue which can be used to process changes to items.
@@ -41,7 +41,7 @@ import (
// affects error retrying.
// NOTE: It is possible to misuse this and cause a race when using an
// external known object source.
// Whether there is a potential race depends on how the comsumer
// Whether there is a potential race depends on how the consumer
// modifies knownObjects. In Pop(), process function is called under
// lock, so it is safe to update data structures in it that need to be
// in sync with the queue (e.g. knownObjects).
@@ -99,7 +99,7 @@ type DeltaFIFOOptions struct {
EmitDeltaTypeReplaced bool
}
// NewDeltaFIFOWithOptions returns a Store which can be used process changes to
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
@@ -144,7 +144,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas.
// will always return an object of type Deltas. List() returns
// the newest objects currently in the FIFO.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
@@ -160,14 +161,16 @@ type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
// `items` maps keys to Deltas.
// `queue` maintains FIFO order of keys for consumption in Pop().
// We maintain the property that keys in the `items` and `queue` are
// strictly 1:1 mapping, and that all Deltas in `items` should have
// at least one Delta.
items map[string]Deltas
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
@@ -180,11 +183,9 @@ type DeltaFIFO struct {
// Replace(), and Resync()
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
closed bool
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
@@ -204,8 +205,8 @@ var (
// Close the queue.
func (f *DeltaFIFO) Close() {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
@@ -226,7 +227,7 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
}
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
// or the first batch of items inserted by Replace() has been popped.
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
@@ -283,6 +284,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
}
}
// exist in items and/or KnownObjects
return f.queueActionLocked(Deleted, obj)
}
@@ -333,6 +335,11 @@ func dedupDeltas(deltas Deltas) Deltas {
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
// `a` and `b` are duplicates. Only keep the one returned from isDup().
// TODO: This extra array allocation and copy seems unnecessary if
// all we do to dedup is compare the new delta with the last element
// in `items`, which could be done by mutating `items` directly.
// Might be worth profiling and investigating if it is safe to optimize.
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
@@ -447,8 +454,8 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
// IsClosed checks if the queue is closed
func (f *DeltaFIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
return f.closed
}
@@ -457,10 +464,12 @@ func (f *DeltaFIFO) IsClosed() bool {
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// process function is called under lock, so it is safe to update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
// process should avoid expensive I/O operation so that other queue operations, i.e.
// Add() and Get(), won't be blocked for too long.
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
@@ -472,7 +481,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
if f.closed {
return nil, ErrFIFOClosed
}
@@ -521,6 +530,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
action = Replaced
}
// Add Sync/Replaced action for each new item.
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
@@ -539,6 +549,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
@@ -650,7 +663,8 @@ type KeyLister interface {
// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
// GetByKey returns the value associated with the key, or sets exists=false.
GetByKey(key string) (value interface{}, exists bool, err error)
}
// DeltaType is the type of a change (addition, deletion, etc)
@@ -713,10 +727,10 @@ func copyDeltas(d Deltas) Deltas {
return d2
}
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was missed. In this
// case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
// was deleted but the watch deletion event was missed while disconnected from
// apiserver. In this case we don't know the final "resting" state of the object, so
// there's a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}

View File

@@ -21,7 +21,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/klog"
"k8s.io/klog/v2"
)
// ExpirationCache implements the store interface

View File

@@ -71,8 +71,8 @@ type Queue interface {
// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool
// Close the queue
@@ -128,8 +128,7 @@ type FIFO struct {
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
closed bool
}
var (
@@ -138,14 +137,14 @@ var (
// Close the queue.
func (f *FIFO) Close() {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
// or the first batch of items inserted by Replace() has been popped.
func (f *FIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
@@ -262,8 +261,8 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
// IsClosed checks if the queue is closed
func (f *FIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
if f.closed {
return true
}
@@ -284,7 +283,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
if f.closed {
return nil, ErrFIFOClosed
}

View File

@@ -17,7 +17,7 @@ limitations under the License.
package cache
import (
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"

View File

@@ -22,7 +22,7 @@ import (
"sync"
"time"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"

View File

@@ -24,7 +24,7 @@ import (
"sync"
"time"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"

View File

@@ -39,7 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/pager"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/utils/trace"
)
@@ -82,9 +82,9 @@ type Reflector struct {
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
// failed with an HTTP 410 (Gone) status code.
isLastSyncResourceVersionGone bool
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
@@ -95,6 +95,37 @@ type Reflector struct {
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// Implementations of this handler may display the error message in other
// ways. Implementations should return quickly - any expensive processing
// should be offloaded.
type WatchErrorHandler func(r *Reflector, err error)
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
func DefaultWatchErrorHandler(r *Reflector, err error) {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
}
var (
@@ -135,9 +166,10 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
r.setExpectedType(expectedType)
return r
@@ -175,7 +207,7 @@ func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
@@ -256,13 +288,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true)
// Retry immediately if the resource version used to list is expired.
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, or the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
// to recover and ensure the reflector makes forward progress.
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
@@ -275,7 +308,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
@@ -292,21 +325,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionExpired(false) // list was successful
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
@@ -364,21 +397,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
@@ -387,14 +409,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
time.Sleep(time.Second)
continue
}
return nil
return err
}
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
@@ -417,8 +439,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
}
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
@@ -518,9 +539,9 @@ func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionGone {
if r.isLastSyncResourceVersionUnavailable {
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
@@ -532,12 +553,12 @@ func (r *Reflector) relistResourceVersion() string {
return r.lastSyncResourceVersion
}
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
// expired error: HTTP 410 (Gone) Status Code.
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// "expired" or "too large resource version" error.
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.isLastSyncResourceVersionGone = isExpired
r.isLastSyncResourceVersionUnavailable = isUnavailable
}
func isExpiredError(err error) bool {
@@ -547,3 +568,7 @@ func isExpiredError(err error) bool {
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}
func isTooLargeResourceVersionError(err error) bool {
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
}

View File

@@ -28,35 +28,37 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/buffer"
"k8s.io/klog"
"k8s.io/klog/v2"
)
// SharedInformer provides eventually consistent linkage of its
// clients to the authoritative state of a given collection of
// objects. An object is identified by its API group, kind/resource,
// namespace, and name; the `ObjectMeta.UID` is not part of an
// object's ID as far as this contract is concerned. One
// namespace (if any), and name; the `ObjectMeta.UID` is not part of
// an object's ID as far as this contract is concerned. One
// SharedInformer provides linkage to objects of a particular API
// group and kind/resource. The linked object collection of a
// SharedInformer may be further restricted to one namespace and/or by
// label selector and/or field selector.
// SharedInformer may be further restricted to one namespace (if
// applicable) and/or by label selector and/or field selector.
//
// The authoritative state of an object is what apiservers provide
// access to, and an object goes through a strict sequence of states.
// An object state is either "absent" or present with a
// ResourceVersion and other appropriate content.
// An object state is either (1) present with a ResourceVersion and
// other appropriate content or (2) "absent".
//
// A SharedInformer maintains a local cache, exposed by GetStore() and
// by GetIndexer() in the case of an indexed informer, of the state of
// each relevant object. This cache is eventually consistent with the
// authoritative state. This means that, unless prevented by
// persistent communication problems, if ever a particular object ID X
// is authoritatively associated with a state S then for every
// SharedInformer I whose collection includes (X, S) eventually either
// (1) I's cache associates X with S or a later state of X, (2) I is
// stopped, or (3) the authoritative state service for X terminates.
// To be formally complete, we say that the absent state meets any
// restriction by label selector or field selector.
// A SharedInformer maintains a local cache --- exposed by GetStore(),
// by GetIndexer() in the case of an indexed informer, and possibly by
// machinery involved in creating and/or accessing the informer --- of
// the state of each relevant object. This cache is eventually
// consistent with the authoritative state. This means that, unless
// prevented by persistent communication problems, if ever a
// particular object ID X is authoritatively associated with a state S
// then for every SharedInformer I whose collection includes (X, S)
// eventually either (1) I's cache associates X with S or a later
// state of X, (2) I is stopped, or (3) the authoritative state
// service for X terminates. To be formally complete, we say that the
// absent state meets any restriction by label selector or field
// selector.
//
// For a given informer and relevant object ID X, the sequence of
// states that appears in the informer's cache is a subsequence of the
@@ -98,7 +100,7 @@ import (
// added before `Run()`, eventually either the SharedInformer is
// stopped or the client is notified of the update. A client added
// after `Run()` starts gets a startup batch of notifications of
// additions of the object existing in the cache at the time that
// additions of the objects existing in the cache at the time that
// client was added; also, for every update to the SharedInformer's
// local cache after that client was added, eventually either the
// SharedInformer is stopped or that client is notified of that
@@ -163,6 +165,21 @@ type SharedInformer interface {
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// There's only one handler, so if you call this multiple times, last one
// wins; calling after the informer has been started returns an error.
//
// The handler is intended for visibility, not to e.g. pause the consumers.
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
@@ -298,6 +315,9 @@ type sharedIndexInformer struct {
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@@ -333,6 +353,18 @@ type deleteNotification struct {
oldObj interface{}
}
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
s.watchErrorHandler = handler
return nil
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
@@ -349,7 +381,8 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {

View File

@@ -98,6 +98,9 @@ func ShortenConfig(config *Config) {
if len(authInfo.ClientCertificateData) > 0 {
authInfo.ClientCertificateData = redactedBytes
}
if len(authInfo.Token) > 0 {
authInfo.Token = "REDACTED"
}
config.AuthInfos[key] = authInfo
}
for key, cluster := range config.Clusters {

View File

@@ -82,6 +82,17 @@ type Cluster struct {
// CertificateAuthorityData contains PEM-encoded certificate authority certificates. Overrides CertificateAuthority
// +optional
CertificateAuthorityData []byte `json:"certificate-authority-data,omitempty"`
// ProxyURL is the URL to the proxy to be used for all requests made by this
// client. URLs with "http", "https", and "socks5" schemes are supported. If
// this configuration is not provided or the empty string, the client
// attempts to construct a proxy configuration from http_proxy and
// https_proxy environment variables. If these environment variables are not
// set, the client does not attempt to proxy requests.
//
// socks5 proxying does not currently support spdy streaming endpoints (exec,
// attach, port forward).
// +optional
ProxyURL string `json:"proxy-url,omitempty"`
// Extensions holds additional information. This is useful for extenders so that reads and writes don't clobber unknown fields
// +optional
Extensions map[string]runtime.Object `json:"extensions,omitempty"`
@@ -182,7 +193,7 @@ func (c AuthProviderConfig) String() string {
// ExecConfig specifies a command to provide client credentials. The command is exec'd
// and outputs structured stdout holding credentials.
//
// See the client.authentiction.k8s.io API group for specifications of the exact input
// See the client.authentication.k8s.io API group for specifications of the exact input
// and output format
type ExecConfig struct {
// Command to execute.
@@ -199,6 +210,11 @@ type ExecConfig struct {
// Preferred input version of the ExecInfo. The returned ExecCredentials MUST use
// the same encoding version as the input.
APIVersion string `json:"apiVersion,omitempty"`
// This text is shown to the user when the executable doesn't seem to be
// present. For example, `brew install foo-cli` might be a good InstallHint for
// foo-cli on Mac OS systems.
InstallHint string `json:"installHint,omitempty"`
}
var _ fmt.Stringer = new(ExecConfig)

View File

@@ -75,6 +75,17 @@ type Cluster struct {
// CertificateAuthorityData contains PEM-encoded certificate authority certificates. Overrides CertificateAuthority
// +optional
CertificateAuthorityData []byte `json:"certificate-authority-data,omitempty"`
// ProxyURL is the URL to the proxy to be used for all requests made by this
// client. URLs with "http", "https", and "socks5" schemes are supported. If
// this configuration is not provided or the empty string, the client
// attempts to construct a proxy configuration from http_proxy and
// https_proxy environment variables. If these environment variables are not
// set, the client does not attempt to proxy requests.
//
// socks5 proxying does not currently support spdy streaming endpoints (exec,
// attach, port forward).
// +optional
ProxyURL string `json:"proxy-url,omitempty"`
// Extensions holds additional information. This is useful for extenders so that reads and writes don't clobber unknown fields
// +optional
Extensions []NamedExtension `json:"extensions,omitempty"`
@@ -181,7 +192,7 @@ type AuthProviderConfig struct {
// ExecConfig specifies a command to provide client credentials. The command is exec'd
// and outputs structured stdout holding credentials.
//
// See the client.authentiction.k8s.io API group for specifications of the exact input
// See the client.authentication.k8s.io API group for specifications of the exact input
// and output format
type ExecConfig struct {
// Command to execute.
@@ -198,6 +209,11 @@ type ExecConfig struct {
// Preferred input version of the ExecInfo. The returned ExecCredentials MUST use
// the same encoding version as the input.
APIVersion string `json:"apiVersion,omitempty"`
// This text is shown to the user when the executable doesn't seem to be
// present. For example, `brew install foo-cli` might be a good InstallHint for
// foo-cli on Mac OS systems.
InstallHint string `json:"installHint,omitempty"`
}
// ExecEnvVar is used for setting environment variables when executing an exec-based

View File

@@ -237,6 +237,7 @@ func autoConvert_v1_Cluster_To_api_Cluster(in *Cluster, out *api.Cluster, s conv
out.InsecureSkipTLSVerify = in.InsecureSkipTLSVerify
out.CertificateAuthority = in.CertificateAuthority
out.CertificateAuthorityData = *(*[]byte)(unsafe.Pointer(&in.CertificateAuthorityData))
out.ProxyURL = in.ProxyURL
if err := Convert_Slice_v1_NamedExtension_To_Map_string_To_runtime_Object(&in.Extensions, &out.Extensions, s); err != nil {
return err
}
@@ -255,6 +256,7 @@ func autoConvert_api_Cluster_To_v1_Cluster(in *api.Cluster, out *Cluster, s conv
out.InsecureSkipTLSVerify = in.InsecureSkipTLSVerify
out.CertificateAuthority = in.CertificateAuthority
out.CertificateAuthorityData = *(*[]byte)(unsafe.Pointer(&in.CertificateAuthorityData))
out.ProxyURL = in.ProxyURL
if err := Convert_Map_string_To_runtime_Object_To_Slice_v1_NamedExtension(&in.Extensions, &out.Extensions, s); err != nil {
return err
}
@@ -356,6 +358,7 @@ func autoConvert_v1_ExecConfig_To_api_ExecConfig(in *ExecConfig, out *api.ExecCo
out.Args = *(*[]string)(unsafe.Pointer(&in.Args))
out.Env = *(*[]api.ExecEnvVar)(unsafe.Pointer(&in.Env))
out.APIVersion = in.APIVersion
out.InstallHint = in.InstallHint
return nil
}
@@ -369,6 +372,7 @@ func autoConvert_api_ExecConfig_To_v1_ExecConfig(in *api.ExecConfig, out *ExecCo
out.Args = *(*[]string)(unsafe.Pointer(&in.Args))
out.Env = *(*[]ExecEnvVar)(unsafe.Pointer(&in.Env))
out.APIVersion = in.APIVersion
out.InstallHint = in.InstallHint
return nil
}

View File

@@ -20,22 +20,24 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"github.com/imdario/mergo"
"k8s.io/klog"
"unicode"
restclient "k8s.io/client-go/rest"
clientauth "k8s.io/client-go/tools/auth"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
"github.com/imdario/mergo"
)
var (
// ClusterDefaults has the same behavior as the old EnvVar and DefaultCluster fields
// DEPRECATED will be replaced
ClusterDefaults = clientcmdapi.Cluster{Server: os.Getenv("KUBERNETES_MASTER")}
ClusterDefaults = clientcmdapi.Cluster{Server: getDefaultServer()}
// DefaultClientConfig represents the legacy behavior of this package for defaulting
// DEPRECATED will be replace
DefaultClientConfig = DirectClientConfig{*clientcmdapi.NewConfig(), "", &ConfigOverrides{
@@ -43,6 +45,15 @@ var (
}, nil, NewDefaultClientConfigLoadingRules(), promptedCredentials{}}
)
// getDefaultServer returns a default setting for DefaultClientConfig
// DEPRECATED
func getDefaultServer() string {
if server := os.Getenv("KUBERNETES_MASTER"); len(server) > 0 {
return server
}
return "http://localhost:8080"
}
// ClientConfig is used to make it easy to get an api server client
type ClientConfig interface {
// RawConfig returns the merged result of all overrides
@@ -141,8 +152,15 @@ func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
clientConfig := &restclient.Config{}
clientConfig.Host = configClusterInfo.Server
if configClusterInfo.ProxyURL != "" {
u, err := parseProxyURL(configClusterInfo.ProxyURL)
if err != nil {
return nil, err
}
clientConfig.Proxy = http.ProxyURL(u)
}
if len(config.overrides.Timeout) > 0 {
if config.overrides != nil && len(config.overrides.Timeout) > 0 {
timeout, err := ParseTimeout(config.overrides.Timeout)
if err != nil {
return nil, err
@@ -252,6 +270,7 @@ func (config *DirectClientConfig) getUserIdentificationPartialConfig(configAuthI
}
if configAuthInfo.Exec != nil {
mergedConfig.ExecProvider = configAuthInfo.Exec
mergedConfig.ExecProvider.InstallHint = cleanANSIEscapeCodes(mergedConfig.ExecProvider.InstallHint)
}
// if there still isn't enough information to authenticate the user, try prompting
@@ -297,6 +316,41 @@ func canIdentifyUser(config restclient.Config) bool {
config.ExecProvider != nil
}
// cleanANSIEscapeCodes takes an arbitrary string and ensures that there are no
// ANSI escape sequences that could put the terminal in a weird state (e.g.,
// "\e[1m" bolds text)
func cleanANSIEscapeCodes(s string) string {
// spaceControlCharacters includes tab, new line, vertical tab, new page, and
// carriage return. These are in the unicode.Cc category, but that category also
// contains ESC (U+001B) which we don't want.
spaceControlCharacters := unicode.RangeTable{
R16: []unicode.Range16{
{Lo: 0x0009, Hi: 0x000D, Stride: 1},
},
}
// Why not make this deny-only (instead of allow-only)? Because unicode.C
// contains newline and tab characters that we want.
allowedRanges := []*unicode.RangeTable{
unicode.L,
unicode.M,
unicode.N,
unicode.P,
unicode.S,
unicode.Z,
&spaceControlCharacters,
}
builder := strings.Builder{}
for _, roon := range s {
if unicode.IsOneOf(allowedRanges, roon) {
builder.WriteRune(roon) // returns nil error, per go doc
} else {
fmt.Fprintf(&builder, "%U", roon)
}
}
return builder.String()
}
// Namespace implements ClientConfig
func (config *DirectClientConfig) Namespace() (string, bool, error) {
if config.overrides != nil && config.overrides.Context.Namespace != "" {
@@ -364,7 +418,7 @@ func (config *DirectClientConfig) ConfirmUsable() error {
// getContextName returns the default, or user-set context name, and a boolean that indicates
// whether the default context name has been overwritten by a user-set flag, or left as its default value
func (config *DirectClientConfig) getContextName() (string, bool) {
if len(config.overrides.CurrentContext) != 0 {
if config.overrides != nil && len(config.overrides.CurrentContext) != 0 {
return config.overrides.CurrentContext, true
}
if len(config.contextName) != 0 {
@@ -378,7 +432,7 @@ func (config *DirectClientConfig) getContextName() (string, bool) {
// and a boolean indicating whether the default authInfo name is overwritten by a user-set flag, or
// left as its default value
func (config *DirectClientConfig) getAuthInfoName() (string, bool) {
if len(config.overrides.Context.AuthInfo) != 0 {
if config.overrides != nil && len(config.overrides.Context.AuthInfo) != 0 {
return config.overrides.Context.AuthInfo, true
}
context, _ := config.getContext()
@@ -389,7 +443,7 @@ func (config *DirectClientConfig) getAuthInfoName() (string, bool) {
// indicating whether the default clusterName has been overwritten by a user-set flag, or left as
// its default value
func (config *DirectClientConfig) getClusterName() (string, bool) {
if len(config.overrides.Context.Cluster) != 0 {
if config.overrides != nil && len(config.overrides.Context.Cluster) != 0 {
return config.overrides.Context.Cluster, true
}
context, _ := config.getContext()
@@ -407,7 +461,9 @@ func (config *DirectClientConfig) getContext() (clientcmdapi.Context, error) {
} else if required {
return clientcmdapi.Context{}, fmt.Errorf("context %q does not exist", contextName)
}
mergo.MergeWithOverwrite(mergedContext, config.overrides.Context)
if config.overrides != nil {
mergo.MergeWithOverwrite(mergedContext, config.overrides.Context)
}
return *mergedContext, nil
}
@@ -423,7 +479,9 @@ func (config *DirectClientConfig) getAuthInfo() (clientcmdapi.AuthInfo, error) {
} else if required {
return clientcmdapi.AuthInfo{}, fmt.Errorf("auth info %q does not exist", authInfoName)
}
mergo.MergeWithOverwrite(mergedAuthInfo, config.overrides.AuthInfo)
if config.overrides != nil {
mergo.MergeWithOverwrite(mergedAuthInfo, config.overrides.AuthInfo)
}
return *mergedAuthInfo, nil
}
@@ -434,30 +492,37 @@ func (config *DirectClientConfig) getCluster() (clientcmdapi.Cluster, error) {
clusterInfoName, required := config.getClusterName()
mergedClusterInfo := clientcmdapi.NewCluster()
mergo.MergeWithOverwrite(mergedClusterInfo, config.overrides.ClusterDefaults)
if config.overrides != nil {
mergo.MergeWithOverwrite(mergedClusterInfo, config.overrides.ClusterDefaults)
}
if configClusterInfo, exists := clusterInfos[clusterInfoName]; exists {
mergo.MergeWithOverwrite(mergedClusterInfo, configClusterInfo)
} else if required {
return clientcmdapi.Cluster{}, fmt.Errorf("cluster %q does not exist", clusterInfoName)
}
mergo.MergeWithOverwrite(mergedClusterInfo, config.overrides.ClusterInfo)
if config.overrides != nil {
mergo.MergeWithOverwrite(mergedClusterInfo, config.overrides.ClusterInfo)
}
// * An override of --insecure-skip-tls-verify=true and no accompanying CA/CA data should clear already-set CA/CA data
// otherwise, a kubeconfig containing a CA reference would return an error that "CA and insecure-skip-tls-verify couldn't both be set".
// * An override of --certificate-authority should also override TLS skip settings and CA data, otherwise existing CA data will take precedence.
caLen := len(config.overrides.ClusterInfo.CertificateAuthority)
caDataLen := len(config.overrides.ClusterInfo.CertificateAuthorityData)
if config.overrides.ClusterInfo.InsecureSkipTLSVerify || caLen > 0 || caDataLen > 0 {
mergedClusterInfo.InsecureSkipTLSVerify = config.overrides.ClusterInfo.InsecureSkipTLSVerify
mergedClusterInfo.CertificateAuthority = config.overrides.ClusterInfo.CertificateAuthority
mergedClusterInfo.CertificateAuthorityData = config.overrides.ClusterInfo.CertificateAuthorityData
}
if config.overrides != nil {
caLen := len(config.overrides.ClusterInfo.CertificateAuthority)
caDataLen := len(config.overrides.ClusterInfo.CertificateAuthorityData)
if config.overrides.ClusterInfo.InsecureSkipTLSVerify || caLen > 0 || caDataLen > 0 {
mergedClusterInfo.InsecureSkipTLSVerify = config.overrides.ClusterInfo.InsecureSkipTLSVerify
mergedClusterInfo.CertificateAuthority = config.overrides.ClusterInfo.CertificateAuthority
mergedClusterInfo.CertificateAuthorityData = config.overrides.ClusterInfo.CertificateAuthorityData
}
// if the --tls-server-name has been set in overrides, use that value.
// if the --server has been set in overrides, then use the value of --tls-server-name specified on the CLI too. This gives the property
// that setting a --server will effectively clear the KUBECONFIG value of tls-server-name if it is specified on the command line which is
// usually correct.
if config.overrides.ClusterInfo.TLSServerName != "" || config.overrides.ClusterInfo.Server != "" {
mergedClusterInfo.TLSServerName = config.overrides.ClusterInfo.TLSServerName
// if the --tls-server-name has been set in overrides, use that value.
// if the --server has been set in overrides, then use the value of --tls-server-name specified on the CLI too. This gives the property
// that setting a --server will effectively clear the KUBECONFIG value of tls-server-name if it is specified on the command line which is
// usually correct.
if config.overrides.ClusterInfo.TLSServerName != "" || config.overrides.ClusterInfo.Server != "" {
mergedClusterInfo.TLSServerName = config.overrides.ClusterInfo.TLSServerName
}
}
return *mergedClusterInfo, nil

View File

@@ -24,7 +24,7 @@ import (
"reflect"
"sort"
"k8s.io/klog"
"k8s.io/klog/v2"
restclient "k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
@@ -58,6 +58,15 @@ type PathOptions struct {
LoadingRules *ClientConfigLoadingRules
}
var (
// UseModifyConfigLock ensures that access to kubeconfig file using ModifyConfig method
// is being guarded by a lock file.
// This variable is intentionaly made public so other consumers of this library
// can modify its default behavior, but be caution when disabling it since
// this will make your code not threadsafe.
UseModifyConfigLock = true
)
func (o *PathOptions) GetEnvVarFiles() []string {
if len(o.EnvVar) == 0 {
return []string{}
@@ -156,15 +165,17 @@ func NewDefaultPathOptions() *PathOptions {
// that means that this code will only write into a single file. If you want to relativizePaths, you must provide a fully qualified path in any
// modified element.
func ModifyConfig(configAccess ConfigAccess, newConfig clientcmdapi.Config, relativizePaths bool) error {
possibleSources := configAccess.GetLoadingPrecedence()
// sort the possible kubeconfig files so we always "lock" in the same order
// to avoid deadlock (note: this can fail w/ symlinks, but... come on).
sort.Strings(possibleSources)
for _, filename := range possibleSources {
if err := lockFile(filename); err != nil {
return err
if UseModifyConfigLock {
possibleSources := configAccess.GetLoadingPrecedence()
// sort the possible kubeconfig files so we always "lock" in the same order
// to avoid deadlock (note: this can fail w/ symlinks, but... come on).
sort.Strings(possibleSources)
for _, filename := range possibleSources {
if err := lockFile(filename); err != nil {
return err
}
defer unlockFile(filename)
}
defer unlockFile(filename)
}
startingConfig, err := configAccess.GetStartingConfig()

View File

@@ -18,6 +18,7 @@ package clientcmd
import (
"fmt"
"net/url"
"strconv"
"time"
)
@@ -33,3 +34,17 @@ func ParseTimeout(duration string) (time.Duration, error) {
}
return 0, fmt.Errorf("Invalid timeout value. Timeout must be a single integer in seconds, or an integer followed by a corresponding time unit (e.g. 1s | 2m | 3h)")
}
func parseProxyURL(proxyURL string) (*url.URL, error) {
u, err := url.Parse(proxyURL)
if err != nil {
return nil, fmt.Errorf("could not parse: %v", proxyURL)
}
switch u.Scheme {
case "http", "https", "socks5":
default:
return nil, fmt.Errorf("unsupported scheme %q, must be http, https, or socks5", u.Scheme)
}
return u, nil
}

View File

@@ -28,7 +28,7 @@ import (
"strings"
"github.com/imdario/mergo"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

View File

@@ -20,7 +20,7 @@ import (
"io"
"sync"
"k8s.io/klog"
"k8s.io/klog/v2"
restclient "k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
@@ -60,27 +60,26 @@ func NewInteractiveDeferredLoadingClientConfig(loader ClientConfigLoader, overri
}
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
if config.clientConfig == nil {
config.loadingLock.Lock()
defer config.loadingLock.Unlock()
config.loadingLock.Lock()
defer config.loadingLock.Unlock()
if config.clientConfig == nil {
mergedConfig, err := config.loader.Load()
if err != nil {
return nil, err
}
var mergedClientConfig ClientConfig
if config.fallbackReader != nil {
mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
} else {
mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
}
config.clientConfig = mergedClientConfig
}
if config.clientConfig != nil {
return config.clientConfig, nil
}
mergedConfig, err := config.loader.Load()
if err != nil {
return nil, err
}
var currentContext string
if config.overrides != nil {
currentContext = config.overrides.CurrentContext
}
if config.fallbackReader != nil {
config.clientConfig = NewInteractiveClientConfig(*mergedConfig, currentContext, config.overrides, config.fallbackReader, config.loader)
} else {
config.clientConfig = NewNonInteractiveClientConfig(*mergedConfig, currentContext, config.overrides, config.loader)
}
return config.clientConfig, nil
}

View File

@@ -30,11 +30,24 @@ import (
var (
ErrNoContext = errors.New("no context chosen")
ErrEmptyConfig = errors.New("no configuration has been provided, try setting KUBERNETES_MASTER environment variable")
ErrEmptyConfig = NewEmptyConfigError("no configuration has been provided, try setting KUBERNETES_MASTER environment variable")
// message is for consistency with old behavior
ErrEmptyCluster = errors.New("cluster has no server defined")
)
// NewEmptyConfigError returns an error wrapping the given message which IsEmptyConfig() will recognize as an empty config error
func NewEmptyConfigError(message string) error {
return &errEmptyConfig{message}
}
type errEmptyConfig struct {
message string
}
func (e *errEmptyConfig) Error() string {
return e.message
}
type errContextNotFound struct {
ContextName string
}
@@ -60,9 +73,14 @@ func IsContextNotFound(err error) bool {
func IsEmptyConfig(err error) bool {
switch t := err.(type) {
case errConfigurationInvalid:
return len(t) == 1 && t[0] == ErrEmptyConfig
if len(t) != 1 {
return false
}
_, ok := t[0].(*errEmptyConfig)
return ok
}
return err == ErrEmptyConfig
_, ok := err.(*errEmptyConfig)
return ok
}
// errConfigurationInvalid is a set of errors indicating the configuration is invalid.
@@ -209,6 +227,11 @@ func validateClusterInfo(clusterName string, clusterInfo clientcmdapi.Cluster) [
validationErrors = append(validationErrors, fmt.Errorf("no server found for cluster %q", clusterName))
}
}
if proxyURL := clusterInfo.ProxyURL; proxyURL != "" {
if _, err := parseProxyURL(proxyURL); err != nil {
validationErrors = append(validationErrors, fmt.Errorf("invalid 'proxy-url' %q for cluster %q: %v", proxyURL, clusterName, err))
}
}
// Make sure CA data and CA file aren't both specified
if len(clusterInfo.CertificateAuthority) != 0 && len(clusterInfo.CertificateAuthorityData) != 0 {
validationErrors = append(validationErrors, fmt.Errorf("certificate-authority-data and certificate-authority are both specified for %v. certificate-authority-data will override.", clusterName))

View File

@@ -8,7 +8,6 @@ reviewers:
- deads2k
- mikedanese
- gmarek
- eparis
- timothysc
- ingvagabund
- resouer

View File

@@ -65,7 +65,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog"
"k8s.io/klog/v2"
)
const (
@@ -195,10 +195,11 @@ type LeaderElector struct {
// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(ctx) {
return // ctx signalled done
}

View File

@@ -88,6 +88,9 @@ func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord)
if err != nil {
return err
}
if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string)
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
return err

View File

@@ -2,6 +2,5 @@
reviewers:
- wojtek-t
- eparis
- krousey
- jayunit100

View File

@@ -17,7 +17,6 @@ reviewers:
- saad-ali
- luxas
- yifan-gu
- eparis
- mwielgus
- timothysc
- jsafrane

View File

@@ -31,7 +31,7 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record/util"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog"
"k8s.io/klog/v2"
)
const maxTriesPerEvent = 12
@@ -121,6 +121,10 @@ type EventBroadcaster interface {
// function. The return value can be ignored or used to stop recording, if desired.
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
// logging function. The return value can be ignored or used to stop recording, if desired.
StartStructuredLogging(verbosity klog.Level) watch.Interface
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
@@ -279,6 +283,15 @@ func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...int
})
}
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
return e.StartEventWatcher(
func(e *v1.Event) {
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
})
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {

View File

@@ -153,7 +153,8 @@ func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
// localKey - key that makes this event in the local group
type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type,
// event.Reason, event.ReportingController and event.ReportingInstance
func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
return strings.Join([]string{
event.Source.Component,
@@ -165,6 +166,8 @@ func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
event.ReportingController,
event.ReportingInstance,
},
""), event.Message
}