Update dependency go modules for k8s v1.29.0-rc.1

This commit is contained in:
Sunny Song
2023-11-29 21:37:58 +00:00
parent cdbbb78ff8
commit 7603bf1c4e
246 changed files with 10181 additions and 13197 deletions

View File

@@ -29,6 +29,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
internalevents "k8s.io/client-go/tools/internal/events"
"k8s.io/client-go/tools/record/util"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
@@ -110,6 +111,21 @@ type EventRecorder interface {
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
// EventRecorderLogger extends EventRecorder such that a logger can
// be set for methods in EventRecorder. Normally, those methods
// uses the global default logger to record errors and debug messages.
// If that is not desired, use WithLogger to provide a logger instance.
type EventRecorderLogger interface {
EventRecorder
// WithLogger replaces the context used for logging. This is a cheap call
// and meant to be used for contextual logging:
// recorder := ...
// logger := klog.FromContext(ctx)
// recorder.WithLogger(logger).Eventf(...)
WithLogger(logger klog.Logger) EventRecorderLogger
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
@@ -131,7 +147,7 @@ type EventBroadcaster interface {
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger
// Shutdown shuts down the broadcaster. Once the broadcaster is shut
// down, it will only try to record an event in a sink once before
@@ -142,12 +158,14 @@ type EventBroadcaster interface {
// EventRecorderAdapter is a wrapper around a "k8s.io/client-go/tools/record".EventRecorder
// implementing the new "k8s.io/client-go/tools/events".EventRecorder interface.
type EventRecorderAdapter struct {
recorder EventRecorder
recorder EventRecorderLogger
}
var _ internalevents.EventRecorder = &EventRecorderAdapter{}
// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
func NewEventRecorderAdapter(recorder EventRecorderLogger) *EventRecorderAdapter {
return &EventRecorderAdapter{
recorder: recorder,
}
@@ -158,28 +176,76 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}
func (a *EventRecorderAdapter) WithLogger(logger klog.Logger) internalevents.EventRecorderLogger {
return &EventRecorderAdapter{
recorder: a.recorder.WithLogger(logger),
}
}
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
c := config{
sleepDuration: defaultSleepDuration,
}
for _, opt := range opts {
opt(&c)
}
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: c.sleepDuration,
options: c.CorrelatorOptions,
}
ctx := c.Context
if ctx == nil {
ctx = context.Background()
} else {
// Calling Shutdown is not required when a context was provided:
// when the context is canceled, this goroutine will shut down
// the broadcaster.
go func() {
<-ctx.Done()
eventBroadcaster.Broadcaster.Shutdown()
}()
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
return eventBroadcaster
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration)
return NewBroadcaster(WithSleepDuration(sleepDuration))
}
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
eventBroadcaster := newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
eventBroadcaster.options = options
return eventBroadcaster
return NewBroadcaster(WithCorrelatorOptions(options))
}
func newEventBroadcaster(broadcaster *watch.Broadcaster, sleepDuration time.Duration) *eventBroadcasterImpl {
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: broadcaster,
sleepDuration: sleepDuration,
func WithCorrelatorOptions(options CorrelatorOptions) BroadcasterOption {
return func(c *config) {
c.CorrelatorOptions = options
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(context.Background())
return eventBroadcaster
}
// WithContext sets a context for the broadcaster. Canceling the context will
// shut down the broadcaster, Shutdown doesn't need to be called. The context
// can also be used to provide a logger.
func WithContext(ctx context.Context) BroadcasterOption {
return func(c *config) {
c.Context = ctx
}
}
func WithSleepDuration(sleepDuration time.Duration) BroadcasterOption {
return func(c *config) {
c.sleepDuration = sleepDuration
}
}
type BroadcasterOption func(*config)
type config struct {
CorrelatorOptions
context.Context
sleepDuration time.Duration
}
type eventBroadcasterImpl struct {
@@ -220,12 +286,12 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
if recordEvent(e.cancelationCtx, sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
break
}
@@ -237,7 +303,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
}
select {
case <-e.cancelationCtx.Done():
klog.Errorf("Unable to write event '%#v' (broadcaster is shut down)", event)
klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (broadcaster is shut down)", "event", event)
return
case <-time.After(delay):
}
@@ -248,7 +314,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
func recordEvent(ctx context.Context, sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *v1.Event
var err error
if updateExistingEvent {
@@ -271,13 +337,13 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
}
return true
case *errors.UnexpectedObjectError:
@@ -286,7 +352,7 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err)
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)", "event", event)
return false
}
@@ -299,12 +365,15 @@ func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...int
})
}
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// StartStructuredLogging starts sending events received from this EventBroadcaster to a structured logger.
// The logger is retrieved from a context if the broadcaster was constructed with a context, otherwise
// the global default is used.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
loggerV := klog.FromContext(e.cancelationCtx).V(int(verbosity))
return e.StartEventWatcher(
func(e *v1.Event) {
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
loggerV.Info("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
})
}
@@ -313,26 +382,32 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
}
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
for {
select {
case <-e.cancelationCtx.Done():
watcher.Stop()
return
case watchEvent := <-watcher.ResultChan():
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
eventHandler(event)
}
}()
return watcher
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}
type recorderImpl struct {
@@ -342,15 +417,17 @@ type recorderImpl struct {
clock clock.PassiveClock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
var _ EventRecorder = &recorderImpl{}
func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
return
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
return
}
@@ -367,16 +444,16 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
// outgoing events anyway).
sent, err := recorder.ActionOrDrop(watch.Added, event)
if err != nil {
klog.Errorf("unable to record event: %v (will not retry!)", err)
logger.Error(err, "Unable to record event (will not retry!)")
return
}
if !sent {
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
}
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, eventtype, reason, message)
recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
@@ -384,7 +461,7 @@ func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, m
}
func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
recorder.generateEvent(klog.Background(), object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
@@ -408,3 +485,26 @@ func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map
Type: eventtype,
}
}
type recorderImplLogger struct {
*recorderImpl
logger klog.Logger
}
var _ EventRecorderLogger = &recorderImplLogger{}
func (recorder recorderImplLogger) Event(object runtime.Object, eventtype, reason, message string) {
recorder.recorderImpl.generateEvent(recorder.logger, object, nil, eventtype, reason, message)
}
func (recorder recorderImplLogger) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder recorderImplLogger) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(recorder.logger, object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
return recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
}

View File

@@ -20,6 +20,7 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
)
// FakeRecorder is used as a fake during tests. It is thread safe. It is usable
@@ -31,6 +32,8 @@ type FakeRecorder struct {
IncludeObject bool
}
var _ EventRecorderLogger = &FakeRecorder{}
func objectString(object runtime.Object, includeObject bool) string {
if !includeObject {
return ""
@@ -68,6 +71,10 @@ func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[st
f.writeEvent(object, annotations, eventtype, reason, messageFmt, args...)
}
func (f *FakeRecorder) WithLogger(logger klog.Logger) EventRecorderLogger {
return f
}
// NewFakeRecorder creates new fake event recorder with event channel with
// buffer of given size.
func NewFakeRecorder(bufferSize int) *FakeRecorder {