update kube dependencies to v1.24.0 release
Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
254
vendor/k8s.io/client-go/rest/request.go
generated
vendored
254
vendor/k8s.io/client-go/rest/request.go
generated
vendored
@@ -82,6 +82,12 @@ func (r *RequestConstructionError) Error() string {
|
||||
|
||||
var noBackoff = &NoBackoff{}
|
||||
|
||||
type requestRetryFunc func(maxRetries int) WithRetry
|
||||
|
||||
func defaultRequestRetryFn(maxRetries int) WithRetry {
|
||||
return &withRetry{maxRetries: maxRetries}
|
||||
}
|
||||
|
||||
// Request allows for building up a request to a server in a chained fashion.
|
||||
// Any errors are stored until the end of your call, so you only have to
|
||||
// check once.
|
||||
@@ -93,6 +99,7 @@ type Request struct {
|
||||
rateLimiter flowcontrol.RateLimiter
|
||||
backoff BackoffManager
|
||||
timeout time.Duration
|
||||
maxRetries int
|
||||
|
||||
// generic components accessible via method setters
|
||||
verb string
|
||||
@@ -109,9 +116,10 @@ type Request struct {
|
||||
subresource string
|
||||
|
||||
// output
|
||||
err error
|
||||
body io.Reader
|
||||
retry WithRetry
|
||||
err error
|
||||
body io.Reader
|
||||
|
||||
retryFn requestRetryFunc
|
||||
}
|
||||
|
||||
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
|
||||
@@ -142,7 +150,8 @@ func NewRequest(c *RESTClient) *Request {
|
||||
backoff: backoff,
|
||||
timeout: timeout,
|
||||
pathPrefix: pathPrefix,
|
||||
retry: &withRetry{maxRetries: 10},
|
||||
maxRetries: 10,
|
||||
retryFn: defaultRequestRetryFn,
|
||||
warningHandler: c.warningHandler,
|
||||
}
|
||||
|
||||
@@ -408,7 +417,10 @@ func (r *Request) Timeout(d time.Duration) *Request {
|
||||
// function is specifically called with a different value.
|
||||
// A zero maxRetries prevent it from doing retires and return an error immediately.
|
||||
func (r *Request) MaxRetries(maxRetries int) *Request {
|
||||
r.retry.SetMaxRetries(maxRetries)
|
||||
if maxRetries < 0 {
|
||||
maxRetries = 0
|
||||
}
|
||||
r.maxRetries = maxRetries
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -496,84 +508,6 @@ func (r *Request) URL() *url.URL {
|
||||
return finalURL
|
||||
}
|
||||
|
||||
// finalURLTemplate is similar to URL(), but will make all specific parameter values equal
|
||||
// - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
|
||||
// parameters will be reset. This creates a copy of the url so as not to change the
|
||||
// underlying object.
|
||||
func (r Request) finalURLTemplate() url.URL {
|
||||
newParams := url.Values{}
|
||||
v := []string{"{value}"}
|
||||
for k := range r.params {
|
||||
newParams[k] = v
|
||||
}
|
||||
r.params = newParams
|
||||
url := r.URL()
|
||||
|
||||
segments := strings.Split(url.Path, "/")
|
||||
groupIndex := 0
|
||||
index := 0
|
||||
trimmedBasePath := ""
|
||||
if url != nil && r.c.base != nil && strings.Contains(url.Path, r.c.base.Path) {
|
||||
p := strings.TrimPrefix(url.Path, r.c.base.Path)
|
||||
if !strings.HasPrefix(p, "/") {
|
||||
p = "/" + p
|
||||
}
|
||||
// store the base path that we have trimmed so we can append it
|
||||
// before returning the URL
|
||||
trimmedBasePath = r.c.base.Path
|
||||
segments = strings.Split(p, "/")
|
||||
groupIndex = 1
|
||||
}
|
||||
if len(segments) <= 2 {
|
||||
return *url
|
||||
}
|
||||
|
||||
const CoreGroupPrefix = "api"
|
||||
const NamedGroupPrefix = "apis"
|
||||
isCoreGroup := segments[groupIndex] == CoreGroupPrefix
|
||||
isNamedGroup := segments[groupIndex] == NamedGroupPrefix
|
||||
if isCoreGroup {
|
||||
// checking the case of core group with /api/v1/... format
|
||||
index = groupIndex + 2
|
||||
} else if isNamedGroup {
|
||||
// checking the case of named group with /apis/apps/v1/... format
|
||||
index = groupIndex + 3
|
||||
} else {
|
||||
// this should not happen that the only two possibilities are /api... and /apis..., just want to put an
|
||||
// outlet here in case more API groups are added in future if ever possible:
|
||||
// https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
|
||||
// if a wrong API groups name is encountered, return the {prefix} for url.Path
|
||||
url.Path = "/{prefix}"
|
||||
url.RawQuery = ""
|
||||
return *url
|
||||
}
|
||||
//switch segLength := len(segments) - index; segLength {
|
||||
switch {
|
||||
// case len(segments) - index == 1:
|
||||
// resource (with no name) do nothing
|
||||
case len(segments)-index == 2:
|
||||
// /$RESOURCE/$NAME: replace $NAME with {name}
|
||||
segments[index+1] = "{name}"
|
||||
case len(segments)-index == 3:
|
||||
if segments[index+2] == "finalize" || segments[index+2] == "status" {
|
||||
// /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
|
||||
segments[index+1] = "{name}"
|
||||
} else {
|
||||
// /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
|
||||
segments[index+1] = "{namespace}"
|
||||
}
|
||||
case len(segments)-index >= 4:
|
||||
segments[index+1] = "{namespace}"
|
||||
// /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
|
||||
if segments[index+3] != "finalize" && segments[index+3] != "status" {
|
||||
// /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
|
||||
segments[index+3] = "{name}"
|
||||
}
|
||||
}
|
||||
url.Path = path.Join(trimmedBasePath, path.Join(segments...))
|
||||
return *url
|
||||
}
|
||||
|
||||
func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) error {
|
||||
if r.rateLimiter == nil {
|
||||
return nil
|
||||
@@ -582,7 +516,9 @@ func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) err
|
||||
now := time.Now()
|
||||
|
||||
err := r.rateLimiter.Wait(ctx)
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("client rate limiter Wait returned an error: %w", err)
|
||||
}
|
||||
latency := time.Since(now)
|
||||
|
||||
var message string
|
||||
@@ -601,7 +537,7 @@ func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) err
|
||||
// but we use a throttled logger to prevent spamming.
|
||||
globalThrottledLogger.Infof("%s", message)
|
||||
}
|
||||
metrics.RateLimiterLatency.Observe(ctx, r.verb, r.finalURLTemplate(), latency)
|
||||
metrics.RateLimiterLatency.Observe(ctx, r.verb, *r.URL(), latency)
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -688,34 +624,21 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
}
|
||||
return false
|
||||
}
|
||||
var retryAfter *RetryAfter
|
||||
retry := r.retryFn(r.maxRetries)
|
||||
url := r.URL().String()
|
||||
for {
|
||||
if err := retry.Before(ctx, r); err != nil {
|
||||
return nil, retry.WrapPreviousError(err)
|
||||
}
|
||||
|
||||
req, err := r.newHTTPRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
|
||||
if retryAfter != nil {
|
||||
// We are retrying the request that we already send to apiserver
|
||||
// at least once before.
|
||||
// This request should also be throttled with the client-internal rate limiter.
|
||||
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
retryAfter = nil
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
updateURLMetrics(ctx, r, resp, err)
|
||||
if r.c.base != nil {
|
||||
if err != nil {
|
||||
r.backoff.UpdateBackoff(r.c.base, err, 0)
|
||||
} else {
|
||||
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
|
||||
}
|
||||
}
|
||||
retry.After(ctx, r, resp, err)
|
||||
if err == nil && resp.StatusCode == http.StatusOK {
|
||||
return r.newStreamWatcher(resp)
|
||||
}
|
||||
@@ -723,14 +646,8 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
done, transformErr := func() (bool, error) {
|
||||
defer readAndCloseResponseBody(resp)
|
||||
|
||||
var retry bool
|
||||
retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
|
||||
if retry {
|
||||
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).Infof("Could not retry request - %v", err)
|
||||
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
@@ -751,7 +668,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
// we need to return the error object from that.
|
||||
err = transformErr
|
||||
}
|
||||
return nil, err
|
||||
return nil, retry.WrapPreviousError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -816,9 +733,13 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||
client = http.DefaultClient
|
||||
}
|
||||
|
||||
var retryAfter *RetryAfter
|
||||
retry := r.retryFn(r.maxRetries)
|
||||
url := r.URL().String()
|
||||
for {
|
||||
if err := retry.Before(ctx, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := r.newHTTPRequest(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -826,27 +747,9 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||
if r.body != nil {
|
||||
req.Body = ioutil.NopCloser(r.body)
|
||||
}
|
||||
|
||||
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
|
||||
if retryAfter != nil {
|
||||
// We are retrying the request that we already send to apiserver
|
||||
// at least once before.
|
||||
// This request should also be throttled with the client-internal rate limiter.
|
||||
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
retryAfter = nil
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
updateURLMetrics(ctx, r, resp, err)
|
||||
if r.c.base != nil {
|
||||
if err != nil {
|
||||
r.backoff.UpdateBackoff(r.URL(), err, 0)
|
||||
} else {
|
||||
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
|
||||
}
|
||||
}
|
||||
retry.After(ctx, r, resp, err)
|
||||
if err != nil {
|
||||
// we only retry on an HTTP response with 'Retry-After' header
|
||||
return nil, err
|
||||
@@ -861,14 +764,8 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
|
||||
done, transformErr := func() (bool, error) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
var retry bool
|
||||
retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError)
|
||||
if retry {
|
||||
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
klog.V(4).Infof("Could not retry request - %v", err)
|
||||
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
|
||||
return false, nil
|
||||
}
|
||||
result := r.transformResponse(resp, req)
|
||||
if err := result.Error(); err != nil {
|
||||
@@ -929,7 +826,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
//Metrics for total request latency
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
|
||||
metrics.RequestLatency.Observe(ctx, r.verb, *r.URL(), time.Since(start))
|
||||
}()
|
||||
|
||||
if r.err != nil {
|
||||
@@ -959,31 +856,38 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
isErrRetryableFunc := func(req *http.Request, err error) bool {
|
||||
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
|
||||
// Thus in case of "GET" operations, we simply retry it.
|
||||
// We are not automatically retrying "write" operations, as they are not idempotent.
|
||||
if req.Method != "GET" {
|
||||
return false
|
||||
}
|
||||
// For connection errors and apiserver shutdown errors retry.
|
||||
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Right now we make about ten retry attempts if we get a Retry-After response.
|
||||
var retryAfter *RetryAfter
|
||||
retry := r.retryFn(r.maxRetries)
|
||||
for {
|
||||
if err := retry.Before(ctx, r); err != nil {
|
||||
return retry.WrapPreviousError(err)
|
||||
}
|
||||
req, err := r.newHTTPRequest(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
|
||||
if retryAfter != nil {
|
||||
// We are retrying the request that we already send to apiserver
|
||||
// at least once before.
|
||||
// This request should also be throttled with the client-internal rate limiter.
|
||||
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
|
||||
return err
|
||||
}
|
||||
retryAfter = nil
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
updateURLMetrics(ctx, r, resp, err)
|
||||
if err != nil {
|
||||
r.backoff.UpdateBackoff(r.URL(), err, 0)
|
||||
} else {
|
||||
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
|
||||
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
|
||||
// https://pkg.go.dev/net/http#Request
|
||||
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
|
||||
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
|
||||
}
|
||||
retry.After(ctx, r, resp, err)
|
||||
|
||||
done := func() bool {
|
||||
defer readAndCloseResponseBody(resp)
|
||||
@@ -996,33 +900,15 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
fn(req, resp)
|
||||
}
|
||||
|
||||
var retry bool
|
||||
retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool {
|
||||
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
|
||||
// Thus in case of "GET" operations, we simply retry it.
|
||||
// We are not automatically retrying "write" operations, as they are not idempotent.
|
||||
if r.verb != "GET" {
|
||||
return false
|
||||
}
|
||||
// For connection errors and apiserver shutdown errors retry.
|
||||
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
|
||||
return true
|
||||
}
|
||||
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
|
||||
return false
|
||||
})
|
||||
if retry {
|
||||
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
klog.V(4).Infof("Could not retry request - %v", err)
|
||||
}
|
||||
|
||||
f(req, resp)
|
||||
return true
|
||||
}()
|
||||
if done {
|
||||
return err
|
||||
return retry.WrapPreviousError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1041,6 +927,9 @@ func (r *Request) Do(ctx context.Context) Result {
|
||||
if err != nil {
|
||||
return Result{err: err}
|
||||
}
|
||||
if result.err == nil || len(result.body) > 0 {
|
||||
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -1057,6 +946,9 @@ func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if result.err == nil || len(result.body) > 0 {
|
||||
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
|
||||
}
|
||||
return result.body, result.err
|
||||
}
|
||||
|
||||
@@ -1172,13 +1064,13 @@ func truncateBody(body string) string {
|
||||
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
|
||||
// whether the body is printable.
|
||||
func glogBody(prefix string, body []byte) {
|
||||
if klog.V(8).Enabled() {
|
||||
if klogV := klog.V(8); klogV.Enabled() {
|
||||
if bytes.IndexFunc(body, func(r rune) bool {
|
||||
return r < 0x0a
|
||||
}) != -1 {
|
||||
klog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
|
||||
klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
|
||||
} else {
|
||||
klog.Infof("%s: %s", prefix, truncateBody(string(body)))
|
||||
klogV.Infof("%s: %s", prefix, truncateBody(string(body)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user