Update dependency go modules for k8s v1.27.0-rc.0
This commit is contained in:
147
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
147
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
@@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache/synctrack"
|
||||
"k8s.io/utils/buffer"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
@@ -132,11 +133,13 @@ import (
|
||||
// state, except that its ResourceVersion is replaced with a
|
||||
// ResourceVersion in which the object is actually absent.
|
||||
type SharedInformer interface {
|
||||
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
|
||||
// period. Events to a single handler are delivered sequentially, but there is no coordination
|
||||
// between different handlers.
|
||||
// It returns a registration handle for the handler that can be used to remove
|
||||
// the handler again.
|
||||
// AddEventHandler adds an event handler to the shared informer using
|
||||
// the shared informer's resync period. Events to a single handler are
|
||||
// delivered sequentially, but there is no coordination between
|
||||
// different handlers.
|
||||
// It returns a registration handle for the handler that can be used to
|
||||
// remove the handler again, or to tell if the handler is synced (has
|
||||
// seen every item in the initial list).
|
||||
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
|
||||
// AddEventHandlerWithResyncPeriod adds an event handler to the
|
||||
// shared informer with the requested resync period; zero means
|
||||
@@ -169,6 +172,10 @@ type SharedInformer interface {
|
||||
// HasSynced returns true if the shared informer's store has been
|
||||
// informed by at least one full LIST of the authoritative state
|
||||
// of the informer's object collection. This is unrelated to "resync".
|
||||
//
|
||||
// Note that this doesn't tell you if an individual handler is synced!!
|
||||
// For that, please call HasSynced on the handle returned by
|
||||
// AddEventHandler.
|
||||
HasSynced() bool
|
||||
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
|
||||
// store. The value returned is not synchronized with access to the underlying store and is not
|
||||
@@ -198,10 +205,7 @@ type SharedInformer interface {
|
||||
//
|
||||
// Must be set before starting the informer.
|
||||
//
|
||||
// Note: Since the object given to the handler may be already shared with
|
||||
// other goroutines, it is advisable to copy the object being
|
||||
// transform before mutating it at all and returning the copy to prevent
|
||||
// data races.
|
||||
// Please see the comment on TransformFunc for more details.
|
||||
SetTransform(handler TransformFunc) error
|
||||
|
||||
// IsStopped reports whether the informer has already been stopped.
|
||||
@@ -213,7 +217,14 @@ type SharedInformer interface {
|
||||
// Opaque interface representing the registration of ResourceEventHandler for
|
||||
// a SharedInformer. Must be supplied back to the same SharedInformer's
|
||||
// `RemoveEventHandler` to unregister the handlers.
|
||||
type ResourceEventHandlerRegistration interface{}
|
||||
//
|
||||
// Also used to tell if the handler is synced (has had all items in the initial
|
||||
// list delivered).
|
||||
type ResourceEventHandlerRegistration interface {
|
||||
// HasSynced reports if both the parent has synced and all pre-sync
|
||||
// events have been delivered.
|
||||
HasSynced() bool
|
||||
}
|
||||
|
||||
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
|
||||
type SharedIndexInformer interface {
|
||||
@@ -223,14 +234,26 @@ type SharedIndexInformer interface {
|
||||
GetIndexer() Indexer
|
||||
}
|
||||
|
||||
// NewSharedInformer creates a new instance for the listwatcher.
|
||||
// NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
|
||||
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
|
||||
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
|
||||
}
|
||||
|
||||
// NewSharedIndexInformer creates a new instance for the listwatcher.
|
||||
// The created informer will not do resyncs if the given
|
||||
// defaultEventHandlerResyncPeriod is zero. Otherwise: for each
|
||||
// NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
|
||||
// NewSharedIndexInformerWithOptions for full details.
|
||||
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
|
||||
return NewSharedIndexInformerWithOptions(
|
||||
lw,
|
||||
exampleObject,
|
||||
SharedIndexInformerOptions{
|
||||
ResyncPeriod: defaultEventHandlerResyncPeriod,
|
||||
Indexers: indexers,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
|
||||
// The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each
|
||||
// handler that with a non-zero requested resync period, whether added
|
||||
// before or after the informer starts, the nominal resync period is
|
||||
// the requested resync period rounded up to a multiple of the
|
||||
@@ -238,21 +261,36 @@ func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEv
|
||||
// checking period is established when the informer starts running,
|
||||
// and is the maximum of (a) the minimum of the resync periods
|
||||
// requested before the informer starts and the
|
||||
// defaultEventHandlerResyncPeriod given here and (b) the constant
|
||||
// options.ResyncPeriod given here and (b) the constant
|
||||
// `minimumResyncPeriod` defined in this file.
|
||||
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
|
||||
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
|
||||
realClock := &clock.RealClock{}
|
||||
sharedIndexInformer := &sharedIndexInformer{
|
||||
|
||||
return &sharedIndexInformer{
|
||||
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
|
||||
processor: &sharedProcessor{clock: realClock},
|
||||
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
|
||||
listerWatcher: lw,
|
||||
objectType: exampleObject,
|
||||
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
|
||||
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
|
||||
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
|
||||
objectDescription: options.ObjectDescription,
|
||||
resyncCheckPeriod: options.ResyncPeriod,
|
||||
defaultEventHandlerResyncPeriod: options.ResyncPeriod,
|
||||
clock: realClock,
|
||||
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
|
||||
}
|
||||
return sharedIndexInformer
|
||||
}
|
||||
|
||||
// SharedIndexInformerOptions configures a sharedIndexInformer.
|
||||
type SharedIndexInformerOptions struct {
|
||||
// ResyncPeriod is the default event handler resync period and resync check
|
||||
// period. If unset/unspecified, these are defaulted to 0 (do not resync).
|
||||
ResyncPeriod time.Duration
|
||||
|
||||
// Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
|
||||
Indexers Indexers
|
||||
|
||||
// ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
|
||||
// underlying Reflector's type description.
|
||||
ObjectDescription string
|
||||
}
|
||||
|
||||
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
|
||||
@@ -326,12 +364,13 @@ type sharedIndexInformer struct {
|
||||
|
||||
listerWatcher ListerWatcher
|
||||
|
||||
// objectType is an example object of the type this informer is
|
||||
// expected to handle. Only the type needs to be right, except
|
||||
// that when that is `unstructured.Unstructured` the object's
|
||||
// `"apiVersion"` and `"kind"` must also be right.
|
||||
// objectType is an example object of the type this informer is expected to handle. If set, an event
|
||||
// with an object with a mismatching type is dropped instead of being delivered to listeners.
|
||||
objectType runtime.Object
|
||||
|
||||
// objectDescription is the description of this informer's objects. This typically defaults to
|
||||
objectDescription string
|
||||
|
||||
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
|
||||
// shouldResync to check if any of our listeners need a resync.
|
||||
resyncCheckPeriod time.Duration
|
||||
@@ -381,7 +420,8 @@ type updateNotification struct {
|
||||
}
|
||||
|
||||
type addNotification struct {
|
||||
newObj interface{}
|
||||
newObj interface{}
|
||||
isInInitialList bool
|
||||
}
|
||||
|
||||
type deleteNotification struct {
|
||||
@@ -422,15 +462,17 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
||||
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: s.indexer,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
ListerWatcher: s.listerWatcher,
|
||||
ObjectType: s.objectType,
|
||||
FullResyncPeriod: s.resyncCheckPeriod,
|
||||
RetryOnError: false,
|
||||
ShouldResync: s.processor.shouldResync,
|
||||
Queue: fifo,
|
||||
ListerWatcher: s.listerWatcher,
|
||||
ObjectType: s.objectType,
|
||||
ObjectDescription: s.objectDescription,
|
||||
FullResyncPeriod: s.resyncCheckPeriod,
|
||||
RetryOnError: false,
|
||||
ShouldResync: s.processor.shouldResync,
|
||||
|
||||
Process: s.HandleDeltas,
|
||||
WatchErrorHandler: s.watchErrorHandler,
|
||||
@@ -559,7 +601,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
}
|
||||
}
|
||||
|
||||
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
|
||||
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
|
||||
|
||||
if !s.started {
|
||||
return s.processor.addListener(listener), nil
|
||||
@@ -575,27 +617,35 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
|
||||
handle := s.processor.addListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
// Note that we enqueue these notifications with the lock held
|
||||
// and before returning the handle. That means there is never a
|
||||
// chance for anyone to call the handle's HasSynced method in a
|
||||
// state when it would falsely return true (i.e., when the
|
||||
// shared informer is synced but it has not observed an Add
|
||||
// with isInitialList being true, nor when the thread
|
||||
// processing notifications somehow goes faster than this
|
||||
// thread adding them and the counter is temporarily zero).
|
||||
listener.add(addNotification{newObj: item, isInInitialList: true})
|
||||
}
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
|
||||
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
return processDeltas(s, s.indexer, s.transform, deltas)
|
||||
return processDeltas(s, s.indexer, deltas, isInInitialList)
|
||||
}
|
||||
return errors.New("object given as Process argument is not Deltas")
|
||||
}
|
||||
|
||||
// Conforms to ResourceEventHandler
|
||||
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
|
||||
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
|
||||
// Invocation of this function is locked under s.blockDeltas, so it is
|
||||
// save to distribute the notification
|
||||
s.cacheMutationDetector.AddObject(obj)
|
||||
s.processor.distribute(addNotification{newObj: obj}, false)
|
||||
s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
|
||||
}
|
||||
|
||||
// Conforms to ResourceEventHandler
|
||||
@@ -817,6 +867,8 @@ type processorListener struct {
|
||||
|
||||
handler ResourceEventHandler
|
||||
|
||||
syncTracker *synctrack.SingleFileTracker
|
||||
|
||||
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
|
||||
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
||||
// added until we OOM.
|
||||
@@ -847,11 +899,18 @@ type processorListener struct {
|
||||
resyncLock sync.Mutex
|
||||
}
|
||||
|
||||
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
|
||||
// HasSynced returns true if the source informer has synced, and all
|
||||
// corresponding events have been delivered.
|
||||
func (p *processorListener) HasSynced() bool {
|
||||
return p.syncTracker.HasSynced()
|
||||
}
|
||||
|
||||
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
|
||||
ret := &processorListener{
|
||||
nextCh: make(chan interface{}),
|
||||
addCh: make(chan interface{}),
|
||||
handler: handler,
|
||||
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
|
||||
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
|
||||
requestedResyncPeriod: requestedResyncPeriod,
|
||||
resyncPeriod: resyncPeriod,
|
||||
@@ -863,6 +922,9 @@ func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, res
|
||||
}
|
||||
|
||||
func (p *processorListener) add(notification interface{}) {
|
||||
if a, ok := notification.(addNotification); ok && a.isInInitialList {
|
||||
p.syncTracker.Start()
|
||||
}
|
||||
p.addCh <- notification
|
||||
}
|
||||
|
||||
@@ -908,7 +970,10 @@ func (p *processorListener) run() {
|
||||
case updateNotification:
|
||||
p.handler.OnUpdate(notification.oldObj, notification.newObj)
|
||||
case addNotification:
|
||||
p.handler.OnAdd(notification.newObj)
|
||||
p.handler.OnAdd(notification.newObj, notification.isInInitialList)
|
||||
if notification.isInInitialList {
|
||||
p.syncTracker.Finished()
|
||||
}
|
||||
case deleteNotification:
|
||||
p.handler.OnDelete(notification.oldObj)
|
||||
default:
|
||||
|
Reference in New Issue
Block a user