Update dependencies for k8s v1.29.0
This commit is contained in:
74
client/vendor/k8s.io/client-go/discovery/discovery_client.go
generated
vendored
74
client/vendor/k8s.io/client-go/discovery/discovery_client.go
generated
vendored
@@ -19,6 +19,7 @@ package discovery
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
goerrors "errors"
|
||||
"fmt"
|
||||
"mime"
|
||||
"net/http"
|
||||
@@ -67,6 +68,9 @@ const (
|
||||
acceptDiscoveryFormats = AcceptV2Beta1 + "," + AcceptV1
|
||||
)
|
||||
|
||||
// Aggregated discovery content-type GVK.
|
||||
var v2Beta1GVK = schema.GroupVersionKind{Group: "apidiscovery.k8s.io", Version: "v2beta1", Kind: "APIGroupDiscoveryList"}
|
||||
|
||||
// DiscoveryInterface holds the methods that discover server-supported API groups,
|
||||
// versions and resources.
|
||||
type DiscoveryInterface interface {
|
||||
@@ -260,16 +264,15 @@ func (d *DiscoveryClient) downloadLegacy() (
|
||||
}
|
||||
|
||||
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
|
||||
// Switch on content-type server responded with: aggregated or unaggregated.
|
||||
switch {
|
||||
case isV2Beta1ContentType(responseContentType):
|
||||
// Based on the content-type server responded with: aggregated or unaggregated.
|
||||
if isGVK, _ := ContentTypeIsGVK(responseContentType, v2Beta1GVK); isGVK {
|
||||
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
|
||||
err = json.Unmarshal(body, &aggregatedDiscovery)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
|
||||
default:
|
||||
} else {
|
||||
// Default is unaggregated discovery v1.
|
||||
var v metav1.APIVersions
|
||||
err = json.Unmarshal(body, &v)
|
||||
@@ -313,16 +316,15 @@ func (d *DiscoveryClient) downloadAPIs() (
|
||||
apiGroupList := &metav1.APIGroupList{}
|
||||
failedGVs := map[schema.GroupVersion]error{}
|
||||
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
|
||||
// Switch on content-type server responded with: aggregated or unaggregated.
|
||||
switch {
|
||||
case isV2Beta1ContentType(responseContentType):
|
||||
// Based on the content-type server responded with: aggregated or unaggregated.
|
||||
if isGVK, _ := ContentTypeIsGVK(responseContentType, v2Beta1GVK); isGVK {
|
||||
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
|
||||
err = json.Unmarshal(body, &aggregatedDiscovery)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
|
||||
default:
|
||||
} else {
|
||||
// Default is unaggregated discovery v1.
|
||||
err = json.Unmarshal(body, apiGroupList)
|
||||
if err != nil {
|
||||
@@ -333,26 +335,29 @@ func (d *DiscoveryClient) downloadAPIs() (
|
||||
return apiGroupList, resourcesByGV, failedGVs, nil
|
||||
}
|
||||
|
||||
// isV2Beta1ContentType checks of the content-type string is both
|
||||
// "application/json" and contains the v2beta1 content-type params.
|
||||
// ContentTypeIsGVK checks of the content-type string is both
|
||||
// "application/json" and matches the provided GVK. An error
|
||||
// is returned if the content type string is malformed.
|
||||
// NOTE: This function is resilient to the ordering of the
|
||||
// content-type parameters, as well as parameters added by
|
||||
// intermediaries such as proxies or gateways. Examples:
|
||||
//
|
||||
// "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" = true
|
||||
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io" = true
|
||||
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8" = true
|
||||
// "application/json" = false
|
||||
// "application/json; charset=UTF-8" = false
|
||||
func isV2Beta1ContentType(contentType string) bool {
|
||||
// ("application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil)
|
||||
// ("application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil)
|
||||
// ("application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8", {apidiscovery.k8s.io, v2beta1, APIGroupDiscoveryList}) = (true, nil)
|
||||
// ("application/json", any GVK) = (false, nil)
|
||||
// ("application/json; charset=UTF-8", any GVK) = (false, nil)
|
||||
// ("malformed content type string", any GVK) = (false, error)
|
||||
func ContentTypeIsGVK(contentType string, gvk schema.GroupVersionKind) (bool, error) {
|
||||
base, params, err := mime.ParseMediaType(contentType)
|
||||
if err != nil {
|
||||
return false
|
||||
return false, err
|
||||
}
|
||||
return runtime.ContentTypeJSON == base &&
|
||||
params["g"] == "apidiscovery.k8s.io" &&
|
||||
params["v"] == "v2beta1" &&
|
||||
params["as"] == "APIGroupDiscoveryList"
|
||||
gvkMatch := runtime.ContentTypeJSON == base &&
|
||||
params["g"] == gvk.Group &&
|
||||
params["v"] == gvk.Version &&
|
||||
params["as"] == gvk.Kind
|
||||
return gvkMatch, nil
|
||||
}
|
||||
|
||||
// ServerGroups returns the supported groups, with information like supported versions and the
|
||||
@@ -415,6 +420,12 @@ func (e *ErrGroupDiscoveryFailed) Error() string {
|
||||
return fmt.Sprintf("unable to retrieve the complete list of server APIs: %s", strings.Join(groups, ", "))
|
||||
}
|
||||
|
||||
// Is makes it possible for the callers to use `errors.Is(` helper on errors wrapped with ErrGroupDiscoveryFailed error.
|
||||
func (e *ErrGroupDiscoveryFailed) Is(target error) bool {
|
||||
_, ok := target.(*ErrGroupDiscoveryFailed)
|
||||
return ok
|
||||
}
|
||||
|
||||
// IsGroupDiscoveryFailedError returns true if the provided error indicates the server was unable to discover
|
||||
// a complete list of APIs for the client to use.
|
||||
func IsGroupDiscoveryFailedError(err error) bool {
|
||||
@@ -422,6 +433,16 @@ func IsGroupDiscoveryFailedError(err error) bool {
|
||||
return err != nil && ok
|
||||
}
|
||||
|
||||
// GroupDiscoveryFailedErrorGroups returns true if the error is an ErrGroupDiscoveryFailed error,
|
||||
// along with the map of group versions that failed discovery.
|
||||
func GroupDiscoveryFailedErrorGroups(err error) (map[schema.GroupVersion]error, bool) {
|
||||
var groupDiscoveryError *ErrGroupDiscoveryFailed
|
||||
if err != nil && goerrors.As(err, &groupDiscoveryError) {
|
||||
return groupDiscoveryError.Groups, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
|
||||
var sgs *metav1.APIGroupList
|
||||
var resources []*metav1.APIResourceList
|
||||
@@ -633,16 +654,7 @@ func (d *DiscoveryClient) ServerVersion() (*version.Info, error) {
|
||||
func (d *DiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
|
||||
data, err := d.restClient.Get().AbsPath("/openapi/v2").SetHeader("Accept", openAPIV2mimePb).Do(context.TODO()).Raw()
|
||||
if err != nil {
|
||||
if errors.IsForbidden(err) || errors.IsNotFound(err) || errors.IsNotAcceptable(err) {
|
||||
// single endpoint not found/registered in old server, try to fetch old endpoint
|
||||
// TODO: remove this when kubectl/client-go don't work with 1.9 server
|
||||
data, err = d.restClient.Get().AbsPath("/swagger-2.0.0.pb-v1").Do(context.TODO()).Raw()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
document := &openapi_v2.Document{}
|
||||
err = proto.Unmarshal(data, document)
|
||||
|
4
client/vendor/k8s.io/client-go/kubernetes/scheme/register.go
generated
vendored
4
client/vendor/k8s.io/client-go/kubernetes/scheme/register.go
generated
vendored
@@ -48,7 +48,7 @@ import (
|
||||
eventsv1 "k8s.io/api/events/v1"
|
||||
eventsv1beta1 "k8s.io/api/events/v1beta1"
|
||||
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
||||
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
|
||||
flowcontrolv1 "k8s.io/api/flowcontrol/v1"
|
||||
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
|
||||
flowcontrolv1beta2 "k8s.io/api/flowcontrol/v1beta2"
|
||||
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
|
||||
@@ -110,7 +110,7 @@ var localSchemeBuilder = runtime.SchemeBuilder{
|
||||
eventsv1.AddToScheme,
|
||||
eventsv1beta1.AddToScheme,
|
||||
extensionsv1beta1.AddToScheme,
|
||||
flowcontrolv1alpha1.AddToScheme,
|
||||
flowcontrolv1.AddToScheme,
|
||||
flowcontrolv1beta1.AddToScheme,
|
||||
flowcontrolv1beta2.AddToScheme,
|
||||
flowcontrolv1beta3.AddToScheme,
|
||||
|
22
client/vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
22
client/vendor/k8s.io/client-go/tools/cache/reflector.go
generated
vendored
@@ -334,12 +334,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
if !apierrors.IsInvalid(err) {
|
||||
return err
|
||||
}
|
||||
klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic")
|
||||
klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
|
||||
fallbackToList = true
|
||||
// Ensure that we won't accidentally pass some garbage down the watch.
|
||||
// ensure that we won't accidentally pass some garbage down the watch.
|
||||
w = nil
|
||||
}
|
||||
}
|
||||
@@ -351,6 +348,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
|
||||
|
||||
resyncerrc := make(chan error, 1)
|
||||
cancelCh := make(chan struct{})
|
||||
defer close(cancelCh)
|
||||
@@ -395,6 +394,11 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
|
||||
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
|
||||
select {
|
||||
case <-stopCh:
|
||||
// we can only end up here when the stopCh
|
||||
// was closed after a successful watchlist or list request
|
||||
if w != nil {
|
||||
w.Stop()
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
@@ -670,6 +674,12 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
|
||||
// "k8s.io/initial-events-end" bookmark.
|
||||
initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
|
||||
r.setIsLastSyncResourceVersionUnavailable(false)
|
||||
|
||||
// we utilize the temporaryStore to ensure independence from the current store implementation.
|
||||
// as of today, the store is implemented as a queue and will be drained by the higher-level
|
||||
// component as soon as it finishes replacing the content.
|
||||
checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)
|
||||
|
||||
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
|
||||
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
|
||||
}
|
||||
@@ -762,7 +772,7 @@ loop:
|
||||
}
|
||||
case watch.Bookmark:
|
||||
// A `Bookmark` means watch has synced here, just update the resourceVersion
|
||||
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
|
||||
if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
|
||||
if exitOnInitialEventsEndBookmark != nil {
|
||||
*exitOnInitialEventsEndBookmark = true
|
||||
}
|
||||
|
119
client/vendor/k8s.io/client-go/tools/cache/reflector_data_consistency_detector.go
generated
vendored
Normal file
119
client/vendor/k8s.io/client-go/tools/cache/reflector_data_consistency_detector.go
generated
vendored
Normal file
@@ -0,0 +1,119 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var dataConsistencyDetectionEnabled = false
|
||||
|
||||
func init() {
|
||||
dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
|
||||
}
|
||||
|
||||
// checkWatchListConsistencyIfRequested performs a data consistency check only when
|
||||
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
|
||||
//
|
||||
// The consistency check is meant to be enforced only in the CI, not in production.
|
||||
// The check ensures that data retrieved by the watch-list api call
|
||||
// is exactly the same as data received by the standard list api call.
|
||||
//
|
||||
// Note that this function will panic when data inconsistency is detected.
|
||||
// This is intentional because we want to catch it in the CI.
|
||||
func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
|
||||
if !dataConsistencyDetectionEnabled {
|
||||
return
|
||||
}
|
||||
checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store)
|
||||
}
|
||||
|
||||
// checkWatchListConsistency exists solely for testing purposes.
|
||||
// we cannot use checkWatchListConsistencyIfRequested because
|
||||
// it is guarded by an environmental variable.
|
||||
// we cannot manipulate the environmental variable because
|
||||
// it will affect other tests in this package.
|
||||
func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
|
||||
klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity)
|
||||
opts := metav1.ListOptions{
|
||||
ResourceVersion: lastSyncedResourceVersion,
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
}
|
||||
var list runtime.Object
|
||||
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) {
|
||||
list, err = listerWatcher.List(opts)
|
||||
if err != nil {
|
||||
// the consistency check will only be enabled in the CI
|
||||
// and LIST calls in general will be retired by the client-go library
|
||||
// if we fail simply log and retry
|
||||
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
rawListItems, err := meta.ExtractListWithAlloc(list)
|
||||
if err != nil {
|
||||
panic(err) // this should never happen
|
||||
}
|
||||
|
||||
listItems := toMetaObjectSliceOrDie(rawListItems)
|
||||
storeItems := toMetaObjectSliceOrDie(store.List())
|
||||
|
||||
sort.Sort(byUID(listItems))
|
||||
sort.Sort(byUID(storeItems))
|
||||
|
||||
if !cmp.Equal(listItems, storeItems) {
|
||||
klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems))
|
||||
msg := "data inconsistency detected for the watch-list feature, panicking!"
|
||||
panic(msg)
|
||||
}
|
||||
}
|
||||
|
||||
type byUID []metav1.Object
|
||||
|
||||
func (a byUID) Len() int { return len(a) }
|
||||
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
|
||||
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
|
||||
result := make([]metav1.Object, len(s))
|
||||
for i, v := range s {
|
||||
m, err := meta.Accessor(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
result[i] = m
|
||||
}
|
||||
return result
|
||||
}
|
2
client/vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
2
client/vendor/k8s.io/client-go/tools/cache/shared_informer.go
generated
vendored
@@ -334,11 +334,9 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
|
||||
},
|
||||
stopCh)
|
||||
if err != nil {
|
||||
klog.V(2).Infof("stop requested")
|
||||
return false
|
||||
}
|
||||
|
||||
klog.V(4).Infof("caches populated")
|
||||
return true
|
||||
}
|
||||
|
||||
|
55
client/vendor/k8s.io/client-go/transport/transport.go
generated
vendored
55
client/vendor/k8s.io/client-go/transport/transport.go
generated
vendored
@@ -96,6 +96,32 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
|
||||
}
|
||||
|
||||
if c.HasCA() {
|
||||
/*
|
||||
kubernetes mutual (2-way) x509 between client and apiserver:
|
||||
|
||||
1. apiserver sending its apiserver certificate along with its publickey to client
|
||||
>2. client verifies the apiserver certificate sent against its cluster certificate authority data
|
||||
3. client sending its client certificate along with its public key to the apiserver
|
||||
4. apiserver verifies the client certificate sent against its cluster certificate authority data
|
||||
|
||||
description:
|
||||
here, with this block,
|
||||
cluster certificate authority data gets loaded into TLS before the handshake process
|
||||
for client to later during the handshake verify the apiserver certificate
|
||||
|
||||
normal args related to this stage:
|
||||
--certificate-authority='':
|
||||
Path to a cert file for the certificate authority
|
||||
|
||||
(retrievable from "kubectl options" command)
|
||||
(suggested by @deads2k)
|
||||
|
||||
see also:
|
||||
- for the step 1, see: staging/src/k8s.io/apiserver/pkg/server/options/serving.go
|
||||
- for the step 3, see: a few lines below in this file
|
||||
- for the step 4, see: staging/src/k8s.io/apiserver/pkg/authentication/request/x509/x509.go
|
||||
*/
|
||||
|
||||
rootCAs, err := rootCertPool(c.TLS.CAData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to load root certificates: %w", err)
|
||||
@@ -121,6 +147,35 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
|
||||
}
|
||||
|
||||
if c.HasCertAuth() || c.HasCertCallback() {
|
||||
|
||||
/*
|
||||
kubernetes mutual (2-way) x509 between client and apiserver:
|
||||
|
||||
1. apiserver sending its apiserver certificate along with its publickey to client
|
||||
2. client verifies the apiserver certificate sent against its cluster certificate authority data
|
||||
>3. client sending its client certificate along with its public key to the apiserver
|
||||
4. apiserver verifies the client certificate sent against its cluster certificate authority data
|
||||
|
||||
description:
|
||||
here, with this callback function,
|
||||
client certificate and pub key get loaded into TLS during the handshake process
|
||||
for apiserver to later in the step 4 verify the client certificate
|
||||
|
||||
normal args related to this stage:
|
||||
--client-certificate='':
|
||||
Path to a client certificate file for TLS
|
||||
--client-key='':
|
||||
Path to a client key file for TLS
|
||||
|
||||
(retrievable from "kubectl options" command)
|
||||
(suggested by @deads2k)
|
||||
|
||||
see also:
|
||||
- for the step 1, see: staging/src/k8s.io/apiserver/pkg/server/options/serving.go
|
||||
- for the step 2, see: a few lines above in this file
|
||||
- for the step 4, see: staging/src/k8s.io/apiserver/pkg/authentication/request/x509/x509.go
|
||||
*/
|
||||
|
||||
tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
// Note: static key/cert data always take precedence over cert
|
||||
// callback.
|
||||
|
55
client/vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
55
client/vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
@@ -238,8 +238,12 @@ func (q *Type) Done(item interface{}) {
|
||||
// ShutDown will cause q to ignore all new items added to it and
|
||||
// immediately instruct the worker goroutines to exit.
|
||||
func (q *Type) ShutDown() {
|
||||
q.setDrain(false)
|
||||
q.shutdown()
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
||||
q.drain = false
|
||||
q.shuttingDown = true
|
||||
q.cond.Broadcast()
|
||||
}
|
||||
|
||||
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
|
||||
@@ -252,53 +256,16 @@ func (q *Type) ShutDown() {
|
||||
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
||||
// without waiting for the drainage.
|
||||
func (q *Type) ShutDownWithDrain() {
|
||||
q.setDrain(true)
|
||||
q.shutdown()
|
||||
for q.isProcessing() && q.shouldDrain() {
|
||||
q.waitForProcessing()
|
||||
}
|
||||
}
|
||||
|
||||
// isProcessing indicates if there are still items on the work queue being
|
||||
// processed. It's used to drain the work queue on an eventual shutdown.
|
||||
func (q *Type) isProcessing() bool {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
return q.processing.len() != 0
|
||||
}
|
||||
|
||||
// waitForProcessing waits for the worker goroutines to finish processing items
|
||||
// and call Done on them.
|
||||
func (q *Type) waitForProcessing() {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
// Ensure that we do not wait on a queue which is already empty, as that
|
||||
// could result in waiting for Done to be called on items in an empty queue
|
||||
// which has already been shut down, which will result in waiting
|
||||
// indefinitely.
|
||||
if q.processing.len() == 0 {
|
||||
return
|
||||
}
|
||||
q.cond.Wait()
|
||||
}
|
||||
|
||||
func (q *Type) setDrain(shouldDrain bool) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
q.drain = shouldDrain
|
||||
}
|
||||
|
||||
func (q *Type) shouldDrain() bool {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
return q.drain
|
||||
}
|
||||
|
||||
func (q *Type) shutdown() {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
q.drain = true
|
||||
q.shuttingDown = true
|
||||
q.cond.Broadcast()
|
||||
|
||||
for q.processing.len() != 0 && q.drain {
|
||||
q.cond.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Type) ShuttingDown() bool {
|
||||
|
Reference in New Issue
Block a user