Update dependency go modules for k8s v1.26.0-rc.0
This commit is contained in:
163
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
163
vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
@@ -135,7 +135,9 @@ type SharedInformer interface {
|
||||
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
|
||||
// period. Events to a single handler are delivered sequentially, but there is no coordination
|
||||
// between different handlers.
|
||||
AddEventHandler(handler ResourceEventHandler)
|
||||
// It returns a registration handle for the handler that can be used to remove
|
||||
// the handler again.
|
||||
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
|
||||
// AddEventHandlerWithResyncPeriod adds an event handler to the
|
||||
// shared informer with the requested resync period; zero means
|
||||
// this handler does not care about resyncs. The resync operation
|
||||
@@ -150,7 +152,13 @@ type SharedInformer interface {
|
||||
// between any two resyncs may be longer than the nominal period
|
||||
// because the implementation takes time to do work and there may
|
||||
// be competing load and scheduling noise.
|
||||
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
|
||||
// It returns a registration handle for the handler that can be used to remove
|
||||
// the handler again and an error if the handler cannot be added.
|
||||
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
|
||||
// RemoveEventHandler removes a formerly added event handler given by
|
||||
// its registration handle.
|
||||
// This function is guaranteed to be idempotent, and thread-safe.
|
||||
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
|
||||
// GetStore returns the informer's local cache as a Store.
|
||||
GetStore() Store
|
||||
// GetController is deprecated, it does nothing useful
|
||||
@@ -195,8 +203,18 @@ type SharedInformer interface {
|
||||
// transform before mutating it at all and returning the copy to prevent
|
||||
// data races.
|
||||
SetTransform(handler TransformFunc) error
|
||||
|
||||
// IsStopped reports whether the informer has already been stopped.
|
||||
// Adding event handlers to already stopped informers is not possible.
|
||||
// An informer already stopped will never be started again.
|
||||
IsStopped() bool
|
||||
}
|
||||
|
||||
// Opaque interface representing the registration of ResourceEventHandler for
|
||||
// a SharedInformer. Must be supplied back to the same SharedInformer's
|
||||
// `RemoveEventHandler` to unregister the handlers.
|
||||
type ResourceEventHandlerRegistration interface{}
|
||||
|
||||
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
|
||||
type SharedIndexInformer interface {
|
||||
SharedInformer
|
||||
@@ -492,8 +510,8 @@ func (s *sharedIndexInformer) GetController() Controller {
|
||||
return &dummyController{informer: s}
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
|
||||
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
|
||||
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
|
||||
return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
|
||||
}
|
||||
|
||||
func determineResyncPeriod(desired, check time.Duration) time.Duration {
|
||||
@@ -513,13 +531,12 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration {
|
||||
|
||||
const minimumResyncPeriod = 1 * time.Second
|
||||
|
||||
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
|
||||
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.stopped {
|
||||
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
|
||||
return
|
||||
return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
|
||||
}
|
||||
|
||||
if resyncPeriod > 0 {
|
||||
@@ -545,8 +562,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
|
||||
|
||||
if !s.started {
|
||||
s.processor.addListener(listener)
|
||||
return
|
||||
return s.processor.addListener(listener), nil
|
||||
}
|
||||
|
||||
// in order to safely join, we have to
|
||||
@@ -557,10 +573,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
|
||||
s.processor.addListener(listener)
|
||||
handle := s.processor.addListener(listener)
|
||||
for _, item := range s.indexer.List() {
|
||||
listener.add(addNotification{newObj: item})
|
||||
}
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
|
||||
@@ -610,6 +627,26 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) {
|
||||
s.processor.distribute(deleteNotification{oldObj: old}, false)
|
||||
}
|
||||
|
||||
// IsStopped reports whether the informer has already been stopped
|
||||
func (s *sharedIndexInformer) IsStopped() bool {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
return s.stopped
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
// in order to safely remove, we have to
|
||||
// 1. stop sending add/update/delete notifications
|
||||
// 2. remove and stop listener
|
||||
// 3. unblock
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
return s.processor.removeListener(handle)
|
||||
}
|
||||
|
||||
// sharedProcessor has a collection of processorListener and can
|
||||
// distribute a notification object to its listeners. There are two
|
||||
// kinds of distribute operations. The sync distributions go to a
|
||||
@@ -619,39 +656,85 @@ func (s *sharedIndexInformer) OnDelete(old interface{}) {
|
||||
type sharedProcessor struct {
|
||||
listenersStarted bool
|
||||
listenersLock sync.RWMutex
|
||||
listeners []*processorListener
|
||||
syncingListeners []*processorListener
|
||||
clock clock.Clock
|
||||
wg wait.Group
|
||||
// Map from listeners to whether or not they are currently syncing
|
||||
listeners map[*processorListener]bool
|
||||
clock clock.Clock
|
||||
wg wait.Group
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) {
|
||||
func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
||||
if p.listeners == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if result, ok := registration.(*processorListener); ok {
|
||||
if _, exists := p.listeners[result]; exists {
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.addListenerLocked(listener)
|
||||
if p.listeners == nil {
|
||||
p.listeners = make(map[*processorListener]bool)
|
||||
}
|
||||
|
||||
p.listeners[listener] = true
|
||||
|
||||
if p.listenersStarted {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
|
||||
return listener
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
|
||||
p.listeners = append(p.listeners, listener)
|
||||
p.syncingListeners = append(p.syncingListeners, listener)
|
||||
func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
listener, ok := handle.(*processorListener)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid key type %t", handle)
|
||||
} else if p.listeners == nil {
|
||||
// No listeners are registered, do nothing
|
||||
return nil
|
||||
} else if _, exists := p.listeners[listener]; !exists {
|
||||
// Listener is not registered, just do nothing
|
||||
return nil
|
||||
}
|
||||
|
||||
delete(p.listeners, listener)
|
||||
|
||||
if p.listenersStarted {
|
||||
close(listener.addCh)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
||||
if sync {
|
||||
for _, listener := range p.syncingListeners {
|
||||
for listener, isSyncing := range p.listeners {
|
||||
switch {
|
||||
case !sync:
|
||||
// non-sync messages are delivered to every listener
|
||||
listener.add(obj)
|
||||
}
|
||||
} else {
|
||||
for _, listener := range p.listeners {
|
||||
case isSyncing:
|
||||
// sync messages are delivered to every syncing listener
|
||||
listener.add(obj)
|
||||
default:
|
||||
// skipping a sync obj for a non-syncing listener
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -660,18 +743,27 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
||||
func() {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
for _, listener := range p.listeners {
|
||||
for listener := range p.listeners {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
p.listenersStarted = true
|
||||
}()
|
||||
<-stopCh
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
for _, listener := range p.listeners {
|
||||
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
for listener := range p.listeners {
|
||||
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
|
||||
}
|
||||
|
||||
// Wipe out list of listeners since they are now closed
|
||||
// (processorListener cannot be re-used)
|
||||
p.listeners = nil
|
||||
|
||||
// Reset to false since no listeners are running
|
||||
p.listenersStarted = false
|
||||
|
||||
p.wg.Wait() // Wait for all .pop() and .run() to stop
|
||||
}
|
||||
|
||||
@@ -681,16 +773,16 @@ func (p *sharedProcessor) shouldResync() bool {
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
|
||||
p.syncingListeners = []*processorListener{}
|
||||
|
||||
resyncNeeded := false
|
||||
now := p.clock.Now()
|
||||
for _, listener := range p.listeners {
|
||||
for listener := range p.listeners {
|
||||
// need to loop through all the listeners to see if they need to resync so we can prepare any
|
||||
// listeners that are going to be resyncing.
|
||||
if listener.shouldResync(now) {
|
||||
shouldResync := listener.shouldResync(now)
|
||||
p.listeners[listener] = shouldResync
|
||||
|
||||
if shouldResync {
|
||||
resyncNeeded = true
|
||||
p.syncingListeners = append(p.syncingListeners, listener)
|
||||
listener.determineNextResync(now)
|
||||
}
|
||||
}
|
||||
@@ -701,8 +793,9 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
||||
for _, listener := range p.listeners {
|
||||
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
|
||||
for listener := range p.listeners {
|
||||
resyncPeriod := determineResyncPeriod(
|
||||
listener.requestedResyncPeriod, resyncCheckPeriod)
|
||||
listener.setResyncPeriod(resyncPeriod)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user