Bump dependencies to Kubernetes 1.20

This commit is contained in:
xing-yang
2020-12-14 18:01:04 +00:00
parent a9a28d8fc7
commit e22364e1cf
799 changed files with 85231 additions and 28917 deletions

View File

@@ -38,6 +38,5 @@ reviewers:
- resouer
- jessfraz
- mfojtik
- mqliang
- sdminonne
- ncdc

View File

@@ -72,6 +72,9 @@ type Config struct {
// Called whenever the ListAndWatch drops the connection with an error.
WatchErrorHandler WatchErrorHandler
// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
}
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
@@ -134,6 +137,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
@@ -144,11 +148,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
// Returns true once this controller has completed an initial resource listing

View File

@@ -145,7 +145,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas. List() returns
// the newest objects currently in the FIFO.
// the newest object from each accumulator in the FIFO.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
@@ -161,12 +161,13 @@ type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
// `items` maps keys to Deltas.
// `queue` maintains FIFO order of keys for consumption in Pop().
// We maintain the property that keys in the `items` and `queue` are
// strictly 1:1 mapping, and that all Deltas in `items` should have
// at least one Delta.
// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas
// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
@@ -376,8 +377,8 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
@@ -389,10 +390,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
@@ -459,7 +464,7 @@ func (f *DeltaFIFO) IsClosed() bool {
return f.closed
}
// Pop blocks until an item is added to the queue, and then returns it. If
// Pop blocks until the queue has some items, and then returns one. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
@@ -494,7 +499,8 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)

View File

@@ -69,6 +69,8 @@ type Reflector struct {
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
initConnBackoffManager wait.BackoffManager
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
@@ -99,6 +101,15 @@ type Reflector struct {
watchErrorHandler WatchErrorHandler
}
// ResourceVersionUpdater is an interface that allows store implementation to
// track the current resource version of the reflector. This is especially
// important if storage bookmarks are enabled.
type ResourceVersionUpdater interface {
// UpdateResourceVersion is called each time current resource version of the reflector
// is updated.
UpdateResourceVersion(resourceVersion string)
}
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
@@ -166,10 +177,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
r.setExpectedType(expectedType)
return r
@@ -404,9 +416,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
// If that's the case begin exponentially backing off and resend watch request.
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
<-r.initConnBackoffManager.Backoff().C()
continue
}
return err
@@ -504,6 +516,9 @@ loop:
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
@@ -570,5 +585,26 @@ func isExpiredError(err error) bool {
}
func isTooLargeResourceVersionError(err error) bool {
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
return true
}
// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
// version is larger than the largest currently available resource version. To ensure backward
// compatibility with these server versions we also need to detect the error based on the content
// of the error message field.
if !apierrors.IsTimeout(err) {
return false
}
apierr, ok := err.(apierrors.APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return false
}
for _, cause := range apierr.Status().Details.Causes {
// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
if cause.Message == "Too large resource version" {
return true
}
}
return false
}

View File

@@ -485,13 +485,13 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update