Add generated file

This PR adds generated files under pkg/client and vendor folder.
This commit is contained in:
xing-yang
2018-07-12 10:55:15 -07:00
parent 36b1de0341
commit e213d1890d
17729 changed files with 5090889 additions and 0 deletions

View File

@@ -0,0 +1,62 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"configsync.go",
"controller.go",
"watch.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig",
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/validation:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint/store:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/kubelet/kubeletconfig/util/panic:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/kubeletconfig/checkpoint:all-srcs",
"//pkg/kubelet/kubeletconfig/configfiles:all-srcs",
"//pkg/kubelet/kubeletconfig/status:all-srcs",
"//pkg/kubelet/kubeletconfig/util/codec:all-srcs",
"//pkg/kubelet/kubeletconfig/util/equal:all-srcs",
"//pkg/kubelet/kubeletconfig/util/files:all-srcs",
"//pkg/kubelet/kubeletconfig/util/log:all-srcs",
"//pkg/kubelet/kubeletconfig/util/panic:all-srcs",
"//pkg/kubelet/kubeletconfig/util/test:all-srcs",
],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,5 @@
approvers:
- mtaufen
- dchen1107
reviewers:
- sig-node-reviewers

View File

@@ -0,0 +1,67 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"configmap_test.go",
"download_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"configmap.go",
"download.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint",
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/kubeletconfig/checkpoint/store:all-srcs",
],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,61 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
)
const configMapConfigKey = "kubelet"
// configMapPayload implements Payload, backed by a v1/ConfigMap config source object
type configMapPayload struct {
cm *apiv1.ConfigMap
}
var _ Payload = (*configMapPayload)(nil)
// NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID
func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) {
if cm == nil {
return nil, fmt.Errorf("ConfigMap must be non-nil")
} else if cm.ObjectMeta.UID == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty UID")
} else if cm.ObjectMeta.ResourceVersion == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty ResourceVersion")
}
return &configMapPayload{cm}, nil
}
func (p *configMapPayload) UID() string {
return string(p.cm.UID)
}
func (p *configMapPayload) ResourceVersion() string {
return p.cm.ResourceVersion
}
func (p *configMapPayload) Files() map[string]string {
return p.cm.Data
}
func (p *configMapPayload) object() interface{} {
return p.cm
}

View File

@@ -0,0 +1,148 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestNewConfigMapPayload(t *testing.T) {
cases := []struct {
desc string
cm *apiv1.ConfigMap
err string
}{
{
desc: "nil",
cm: nil,
err: "ConfigMap must be non-nil",
},
{
desc: "missing uid",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
ResourceVersion: "rv",
},
},
err: "ConfigMap must have a non-empty UID",
},
{
desc: "missing resourceVersion",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
},
},
err: "ConfigMap must have a non-empty ResourceVersion",
},
{
desc: "populated v1/ConfigMap",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
ResourceVersion: "rv",
},
Data: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
err: "",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(c.cm)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.cm, payload.object()) {
t.Errorf("expect %s but got %s", spew.Sdump(c.cm), spew.Sdump(payload))
}
})
}
}
func TestConfigMapPayloadUID(t *testing.T) {
const expect = "uid"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect, ResourceVersion: "rv"}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
uid := payload.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
func TestConfigMapPayloadResourceVersion(t *testing.T) {
const expect = "rv"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: expect}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
resourceVersion := payload.ResourceVersion()
if expect != resourceVersion {
t.Errorf("expect %q, but got %q", expect, resourceVersion)
}
}
func TestConfigMapPayloadFiles(t *testing.T) {
cases := []struct {
desc string
data map[string]string
expect map[string]string
}{
{"nil", nil, nil},
{"empty", map[string]string{}, map[string]string{}},
{"populated",
map[string]string{
"foo": "1",
"bar": "2",
},
map[string]string{
"foo": "1",
"bar": "2",
}},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: "rv"}, Data: c.data})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
files := payload.Files()
if !reflect.DeepEqual(c.expect, files) {
t.Errorf("expected %v, but got %v", c.expect, files)
}
})
}
}

View File

@@ -0,0 +1,273 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
"math/rand"
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
// Payload represents a local copy of a config source (payload) object
type Payload interface {
// UID returns a globally unique (space and time) identifier for the payload.
// The return value is guaranteed non-empty.
UID() string
// ResourceVersion returns a resource version for the payload.
// The return value is guaranteed non-empty.
ResourceVersion() string
// Files returns a map of filenames to file contents.
Files() map[string]string
// object returns the underlying checkpointed object.
object() interface{}
}
// RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
type RemoteConfigSource interface {
// KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files()
KubeletFilename() string
// APIPath returns the API path to the remote resource, e.g. its SelfLink
APIPath() string
// UID returns the globally unique identifier for the most recently downloaded payload targeted by the source.
UID() string
// ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source.
ResourceVersion() string
// Download downloads the remote config source's target object and returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails.
// Download takes an optional store as an argument. If provided, Download will check this store for the
// target object prior to contacting the API server.
// Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload.
Download(client clientset.Interface, store cache.Store) (Payload, string, error)
// Informer returns an informer that can be used to detect changes to the remote config source
Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer
// Encode returns a []byte representation of the object behind the RemoteConfigSource
Encode() ([]byte, error)
// NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object.
// All RemoteConfigSources are expected to be backed by a NodeConfigSource,
// though the convenience methods on the interface will target the source
// type that was detected in a call to NewRemoteConfigSource.
NodeConfigSource() *apiv1.NodeConfigSource
}
// NewRemoteConfigSource constructs a RemoteConfigSource from a v1/NodeConfigSource object
// You should only call this with a non-nil config source.
// Note that the API server validates Node.Spec.ConfigSource.
func NewRemoteConfigSource(source *apiv1.NodeConfigSource) (RemoteConfigSource, string, error) {
// NOTE: Even though the API server validates the config, we check whether all *known* fields are
// nil here, so that if a new API server allows a new config source type, old clients can send
// an error message rather than crashing due to a nil pointer dereference.
// Exactly one reference subfield of the config source must be non-nil.
// Currently ConfigMap is the only reference subfield.
if source.ConfigMap == nil {
return nil, status.AllNilSubfieldsError, fmt.Errorf("%s, NodeConfigSource was: %#v", status.AllNilSubfieldsError, source)
}
return &remoteConfigMap{source}, "", nil
}
// DecodeRemoteConfigSource is a helper for using the apimachinery to decode serialized RemoteConfigSources;
// e.g. the metadata stored by checkpoint/store/fsstore.go
func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
// decode the remote config source
_, codecs, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
obj, err := runtime.Decode(codecs.UniversalDecoder(), data)
if err != nil {
return nil, fmt.Errorf("failed to decode, error: %v", err)
}
// for now we assume we are trying to load an kubeletconfigv1beta1.SerializedNodeConfigSource,
// this may need to be extended if e.g. a new version of the api is born
cs, ok := obj.(*kubeletconfiginternal.SerializedNodeConfigSource)
if !ok {
return nil, fmt.Errorf("failed to cast decoded remote config source to *k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig.SerializedNodeConfigSource")
}
// we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here
source, _, err := NewRemoteConfigSource(&cs.Source)
if err != nil {
return nil, err
}
return source, nil
}
// EqualRemoteConfigSources is a helper for comparing remote config sources by
// comparing the underlying API objects for semantic equality.
func EqualRemoteConfigSources(a, b RemoteConfigSource) bool {
if a != nil && b != nil {
return apiequality.Semantic.DeepEqual(a.NodeConfigSource(), b.NodeConfigSource())
}
return a == b
}
// remoteConfigMap implements RemoteConfigSource for v1/ConfigMap config sources
type remoteConfigMap struct {
source *apiv1.NodeConfigSource
}
var _ RemoteConfigSource = (*remoteConfigMap)(nil)
func (r *remoteConfigMap) KubeletFilename() string {
return r.source.ConfigMap.KubeletConfigKey
}
const configMapAPIPathFmt = "/api/v1/namespaces/%s/configmaps/%s"
func (r *remoteConfigMap) APIPath() string {
ref := r.source.ConfigMap
return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
}
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMap.UID)
}
func (r *remoteConfigMap) ResourceVersion() string {
return r.source.ConfigMap.ResourceVersion
}
func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) {
var (
cm *apiv1.ConfigMap
err error
)
// check the in-memory store for the ConfigMap, so we can skip unnecessary downloads
if store != nil {
utillog.Infof("checking in-memory store for %s", r.APIPath())
cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name)
if err != nil {
// just log the error, we'll attempt a direct download instead
utillog.Errorf("failed to check in-memory store for %s, error: %v", r.APIPath(), err)
} else if cm != nil {
utillog.Infof("found %s in in-memory store, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} else {
utillog.Infof("did not find %s in in-memory store", r.APIPath())
}
}
// if we didn't find the ConfigMap in the in-memory store, download it from the API server
if cm == nil {
utillog.Infof("attempting to download %s", r.APIPath())
cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
if err != nil {
return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err)
}
utillog.Infof("successfully downloaded %s, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} // Assert: Now we have a non-nil ConfigMap
// construct Payload from the ConfigMap
payload, err := NewConfigMapPayload(cm)
if err != nil {
// We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should
// never happen on objects in the informer's store, or objects downloaded from the API server
// directly, so we report InternalError.
return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
// update internal UID and ResourceVersion based on latest ConfigMap
r.source.ConfigMap.UID = cm.UID
r.source.ConfigMap.ResourceVersion = cm.ResourceVersion
return payload, "", nil
}
func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer {
// select ConfigMap by name
fieldselector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name)
// add some randomness to resync period, which can help avoid controllers falling into lock-step
minResyncPeriod := 15 * time.Minute
factor := rand.Float64() + 1
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (kuberuntime.Object, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).List(metav1.ListOptions{
FieldSelector: fieldselector.String(),
})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Watch(metav1.ListOptions{
FieldSelector: fieldselector.String(),
ResourceVersion: options.ResourceVersion,
})
},
}
informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod)
informer.AddEventHandler(handler)
return informer
}
func (r *remoteConfigMap) Encode() ([]byte, error) {
encoder, err := utilcodec.NewKubeletconfigYAMLEncoder(kubeletconfigv1beta1.SchemeGroupVersion)
if err != nil {
return nil, err
}
data, err := runtime.Encode(encoder, &kubeletconfigv1beta1.SerializedNodeConfigSource{Source: *r.source})
if err != nil {
return nil, err
}
return data, nil
}
func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource {
return r.source.DeepCopy()
}
func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) {
key := fmt.Sprintf("%s/%s", namespace, name)
obj, ok, err := store.GetByKey(key)
if err != nil || !ok {
return nil, err
}
cm, ok := obj.(*apiv1.ConfigMap)
if !ok {
err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key)
utillog.Errorf(err.Error())
return nil, err
}
return cm, nil
}

View File

@@ -0,0 +1,242 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestNewRemoteConfigSource(t *testing.T) {
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect RemoteConfigSource
err string
}{
{
desc: "all NodeConfigSource subfields nil",
source: &apiv1.NodeConfigSource{},
expect: nil,
err: "exactly one subfield must be non-nil",
},
{
desc: "ConfigMap: valid reference",
source: &apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
expect: &remoteConfigMap{&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}}},
err: "",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
source, _, err := NewRemoteConfigSource(c.source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.expect.NodeConfigSource(), source.NodeConfigSource()) {
t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestRemoteConfigMapUID(t *testing.T) {
const expect = "uid"
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: expect,
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
uid := source.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
func TestRemoteConfigMapAPIPath(t *testing.T) {
const (
name = "name"
namespace = "namespace"
)
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: name,
Namespace: namespace,
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
expect := fmt.Sprintf(configMapAPIPathFmt, namespace, name)
path := source.APIPath()
if expect != path {
t.Errorf("expect %q, but got %q", expect, path)
}
}
func TestRemoteConfigMapDownload(t *testing.T) {
cm := &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
}}
source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
KubeletConfigKey: "kubelet",
}}
expectPayload, err := NewConfigMapPayload(cm)
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
missingStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
hasStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
if err := hasStore.Add(cm); err != nil {
t.Fatalf("unexpected error constructing hasStore")
}
missingClient := fakeclient.NewSimpleClientset()
hasClient := fakeclient.NewSimpleClientset(cm)
cases := []struct {
desc string
client clientset.Interface
store cache.Store
err string
}{
{
desc: "nil store, object does not exist in API server",
client: missingClient,
err: "not found",
},
{
desc: "nil store, object exists in API server",
client: hasClient,
},
{
desc: "object exists in store and API server",
store: hasStore,
client: hasClient,
},
{
desc: "object exists in store, but does not exist in API server",
store: hasStore,
client: missingClient,
},
{
desc: "object does not exist in store, but exists in API server",
store: missingStore,
client: hasClient,
},
{
desc: "object does not exist in store or API server",
client: missingClient,
store: missingStore,
err: "not found",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// deep copy so we can always check the UID/ResourceVersion are set after Download
s, _, err := NewRemoteConfigSource(source.DeepCopy())
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
// attempt download
p, _, err := s.Download(c.client, c.store)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// downloaded object should match the expected
if !apiequality.Semantic.DeepEqual(expectPayload.object(), p.object()) {
t.Errorf("expect Checkpoint %s but got %s", spew.Sdump(expectPayload), spew.Sdump(p))
}
// source UID and ResourceVersion should be updated by Download
if p.UID() != s.UID() {
t.Errorf("expect UID to be updated by Download to match payload: %s, but got source UID: %s", p.UID(), s.UID())
}
if p.ResourceVersion() != s.ResourceVersion() {
t.Errorf("expect ResourceVersion to be updated by Download to match payload: %s, but got source ResourceVersion: %s", p.ResourceVersion(), s.ResourceVersion())
}
})
}
}
func TestEqualRemoteConfigSources(t *testing.T) {
cases := []struct {
desc string
a RemoteConfigSource
b RemoteConfigSource
expect bool
}{
{"both nil", nil, nil, true},
{"a nil", nil, &remoteConfigMap{}, false},
{"b nil", &remoteConfigMap{}, nil, false},
{"neither nil, equal", &remoteConfigMap{}, &remoteConfigMap{}, true},
{
desc: "neither nil, not equal",
a: &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{Name: "a"}}},
b: &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{KubeletConfigKey: "kubelet"}}},
expect: false,
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
if EqualRemoteConfigSources(c.a, c.b) != c.expect {
t.Errorf("expected EqualRemoteConfigSources to return %t, but got %t", c.expect, !c.expect)
}
})
}
}

View File

@@ -0,0 +1,61 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"fsstore_test.go",
"store_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"fakestore.go",
"fsstore.go",
"store.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store",
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/configfiles:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/util/filesystem:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,75 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
// so far only implements Assigned(), LastKnownGood(), SetAssigned(), and SetLastKnownGood()
type fakeStore struct {
assigned checkpoint.RemoteConfigSource
lastKnownGood checkpoint.RemoteConfigSource
}
var _ Store = (*fakeStore)(nil)
func (s *fakeStore) Initialize() error {
return fmt.Errorf("Initialize method not supported")
}
func (s *fakeStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
return false, fmt.Errorf("Exists method not supported")
}
func (s *fakeStore) Save(c checkpoint.Payload) error {
return fmt.Errorf("Save method not supported")
}
func (s *fakeStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
return nil, fmt.Errorf("Load method not supported")
}
func (s *fakeStore) AssignedModified() (time.Time, error) {
return time.Time{}, fmt.Errorf("AssignedModified method not supported")
}
func (s *fakeStore) Assigned() (checkpoint.RemoteConfigSource, error) {
return s.assigned, nil
}
func (s *fakeStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) {
return s.lastKnownGood, nil
}
func (s *fakeStore) SetAssigned(source checkpoint.RemoteConfigSource) error {
s.assigned = source
return nil
}
func (s *fakeStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
s.lastKnownGood = source
return nil
}
func (s *fakeStore) Reset() (bool, error) {
return false, fmt.Errorf("Reset method not supported")
}

View File

@@ -0,0 +1,194 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"path/filepath"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
metaDir = "meta"
assignedFile = "assigned"
lastKnownGoodFile = "last-known-good"
checkpointsDir = "checkpoints"
)
// fsStore is for tracking checkpoints in the local filesystem, implements Store
type fsStore struct {
// fs is the filesystem to use for storage operations; can be mocked for testing
fs utilfs.Filesystem
// dir is the absolute path to the storage directory for fsStore
dir string
}
var _ Store = (*fsStore)(nil)
// NewFsStore returns a Store that saves its data in dir
func NewFsStore(fs utilfs.Filesystem, dir string) Store {
return &fsStore{
fs: fs,
dir: dir,
}
}
func (s *fsStore) Initialize() error {
utillog.Infof("initializing config checkpoints directory %q", s.dir)
// ensure top-level dir for store
if err := utilfiles.EnsureDir(s.fs, s.dir); err != nil {
return err
}
// ensure metadata directory and reference files (tracks assigned and lkg configs)
if err := utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, metaDir)); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, s.metaPath(assignedFile)); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, s.metaPath(lastKnownGoodFile)); err != nil {
return err
}
// ensure checkpoints directory (saves unpacked payloads in subdirectories named after payload UID)
return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir))
}
func (s *fsStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
const errfmt = "failed to determine whether checkpoint exists for source %s, UID: %s, ResourceVersion: %s exists, error: %v"
if len(source.UID()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty UID is ambiguous")
}
if len(source.ResourceVersion()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty ResourceVersion is ambiguous")
}
// we check whether the directory was created for the resource
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(source.UID(), source.ResourceVersion()))
if err != nil {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), err)
}
return ok, nil
}
func (s *fsStore) Save(payload checkpoint.Payload) error {
// Note: Payload interface guarantees UID() and ResourceVersion() to be non-empty
path := s.checkpointPath(payload.UID(), payload.ResourceVersion())
// ensure the parent dir (checkpoints/uid) exists, since ReplaceDir requires the parent of the replacee
// to exist, and we checkpoint as checkpoints/uid/resourceVersion/files-from-configmap
if err := utilfiles.EnsureDir(s.fs, filepath.Dir(path)); err != nil {
return err
}
// save the checkpoint's files in the appropriate checkpoint dir
return utilfiles.ReplaceDir(s.fs, path, payload.Files())
}
func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
sourceFmt := fmt.Sprintf("%s, UID: %s, ResourceVersion: %s", source.APIPath(), source.UID(), source.ResourceVersion())
// check if a checkpoint exists for the source
if ok, err := s.Exists(source); err != nil {
return nil, err
} else if !ok {
return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt)
}
// load the kubelet config file
utillog.Infof("loading Kubelet configuration checkpoint for source %s", sourceFmt)
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID(), source.ResourceVersion()), source.KubeletFilename()))
if err != nil {
return nil, err
}
kc, err := loader.Load()
if err != nil {
return nil, err
}
return kc, nil
}
func (s *fsStore) AssignedModified() (time.Time, error) {
path := s.metaPath(assignedFile)
info, err := s.fs.Stat(path)
if err != nil {
return time.Time{}, fmt.Errorf("failed to stat %q while checking modification time, error: %v", path, err)
}
return info.ModTime(), nil
}
func (s *fsStore) Assigned() (checkpoint.RemoteConfigSource, error) {
return readRemoteConfigSource(s.fs, s.metaPath(assignedFile))
}
func (s *fsStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) {
return readRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile))
}
func (s *fsStore) SetAssigned(source checkpoint.RemoteConfigSource) error {
return writeRemoteConfigSource(s.fs, s.metaPath(assignedFile), source)
}
func (s *fsStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
return writeRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile), source)
}
func (s *fsStore) Reset() (bool, error) {
return reset(s)
}
func (s *fsStore) checkpointPath(uid, resourceVersion string) string {
return filepath.Join(s.dir, checkpointsDir, uid, resourceVersion)
}
func (s *fsStore) metaPath(name string) string {
return filepath.Join(s.dir, metaDir, name)
}
func readRemoteConfigSource(fs utilfs.Filesystem, path string) (checkpoint.RemoteConfigSource, error) {
data, err := fs.ReadFile(path)
if err != nil {
return nil, err
} else if len(data) == 0 {
return nil, nil
}
return checkpoint.DecodeRemoteConfigSource(data)
}
func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoint.RemoteConfigSource) error {
// if nil, reset the file
if source == nil {
return utilfiles.ReplaceFile(fs, path, []byte{})
}
// check that UID and ResourceVersion are non-empty,
// error to save reference if the checkpoint can't be fully resolved
if source.UID() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty UID is ambiguous")
}
if source.ResourceVersion() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty ResourceVersion is ambiguous")
}
// encode the source and save it to the file
data, err := source.Encode()
if err != nil {
return err
}
return utilfiles.ReplaceFile(fs, path, data)
}

View File

@@ -0,0 +1,717 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"io/ioutil"
"path/filepath"
"reflect"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
var testdir string
func init() {
tmp, err := ioutil.TempDir("", "fsstore-test")
if err != nil {
panic(err)
}
testdir = tmp
}
func newInitializedFakeFsStore() (*fsStore, error) {
// Test with the default filesystem, the fake filesystem has an issue caused by afero: https://github.com/spf13/afero/issues/141
// The default filesystem also behaves more like production, so we should probably not mock the filesystem for unit tests.
fs := utilfs.DefaultFs{}
tmpdir, err := fs.TempDir(testdir, "store-")
if err != nil {
return nil, err
}
store := NewFsStore(fs, tmpdir)
if err := store.Initialize(); err != nil {
return nil, err
}
return store.(*fsStore), nil
}
func TestFsStoreInitialize(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("fsStore.Initialize() failed with error: %v", err)
}
// check that store.dir exists
if _, err := store.fs.Stat(store.dir); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.dir, err)
}
// check that meta dir exists
if _, err := store.fs.Stat(store.metaPath("")); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(""), err)
}
// check that checkpoints dir exists
if _, err := store.fs.Stat(filepath.Join(store.dir, checkpointsDir)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", filepath.Join(store.dir, checkpointsDir), err)
}
// check that assignedFile exists
if _, err := store.fs.Stat(store.metaPath(assignedFile)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(assignedFile), err)
}
// check that lastKnownGoodFile exists
if _, err := store.fs.Stat(store.metaPath(lastKnownGoodFile)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.metaPath(lastKnownGoodFile), err)
}
}
func TestFsStoreExists(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
// checkpoint a payload
const (
uid = "uid"
resourceVersion = "1"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid, ResourceVersion: resourceVersion}})
if err != nil {
t.Fatalf("could not construct Payload, error: %v", err)
}
if err := store.Save(p); err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
uid types.UID
resourceVersion string
expect bool
err string
}{
{"exists", uid, resourceVersion, true, ""},
{"does not exist", "bogus-uid", "bogus-resourceVersion", false, ""},
{"ambiguous UID", "", "bogus-resourceVersion", false, "empty UID is ambiguous"},
{"ambiguous ResourceVersion", "bogus-uid", "", false, "empty ResourceVersion is ambiguous"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ok, err := store.Exists(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if c.expect != ok {
t.Errorf("expect %t but got %t", c.expect, ok)
}
})
}
}
func TestFsStoreSave(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
nameTooLong := func() string {
s := ""
for i := 0; i < 256; i++ {
s += "a"
}
return s
}()
const (
uid = "uid"
resourceVersion = "1"
)
cases := []struct {
desc string
uid types.UID
resourceVersion string
files map[string]string
err string
}{
{"valid payload", uid, resourceVersion, map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""},
{"empty key name", uid, resourceVersion, map[string]string{"": "foocontent"}, "must not be empty"},
{"key name is not a base file name (foo/bar)", uid, resourceVersion, map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"},
{"key name is not a base file name (/foo)", uid, resourceVersion, map[string]string{"/bar": "foocontent"}, "only base names are allowed"},
{"used .", uid, resourceVersion, map[string]string{".": "foocontent"}, "may not be '.' or '..'"},
{"used ..", uid, resourceVersion, map[string]string{"..": "foocontent"}, "may not be '.' or '..'"},
{"length violation", uid, resourceVersion, map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// construct the payload
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: c.uid, ResourceVersion: c.resourceVersion},
Data: c.files,
})
// if no error, save the payload, otherwise skip straight to error handler
if err == nil {
err = store.Save(p)
}
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// read the saved checkpoint
m, err := mapFromCheckpoint(store, p.UID(), p.ResourceVersion())
if err != nil {
t.Fatalf("error loading checkpoint to map: %v", err)
}
// compare our expectation to what got saved
expect := p.Files()
if !reflect.DeepEqual(expect, m) {
t.Errorf("expect %v, but got %v", expect, m)
}
})
}
}
func TestFsStoreLoad(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
// encode a kubelet configuration that has all defaults set
expect, err := newKubeletConfiguration()
if err != nil {
t.Fatalf("error constructing KubeletConfiguration: %v", err)
}
data, err := utilcodec.EncodeKubeletConfig(expect, v1beta1.SchemeGroupVersion)
if err != nil {
t.Fatalf("error encoding KubeletConfiguration: %v", err)
}
// construct a payload that contains the kubeletconfig
const (
uid = "uid"
resourceVersion = "1"
kubeletKey = "kubelet"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), ResourceVersion: resourceVersion},
Data: map[string]string{
kubeletKey: string(data),
},
})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
// save the payload
err = store.Save(p)
if err != nil {
t.Fatalf("error saving payload: %v", err)
}
cases := []struct {
desc string
uid types.UID
resourceVersion string
err string
}{
{"checkpoint exists", uid, resourceVersion, ""},
{"checkpoint does not exist", "bogus-uid", "bogus-resourceVersion", "no checkpoint for source"},
{"ambiguous UID", "", "bogus-resourceVersion", "empty UID is ambiguous"},
{"ambiguous ResourceVersion", "bogus-uid", "", "empty ResourceVersion is ambiguous"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: kubeletKey,
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
loaded, err := store.Load(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !reflect.DeepEqual(expect, loaded) {
t.Errorf("expect %#v, but got %#v", expect, loaded)
}
})
}
}
func TestFsStoreAssignedModified(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
// create an empty assigned file, this is good enough for testing
saveTestSourceFile(t, store, assignedFile, nil)
// set the timestamps to the current time, so we can compare to result of store.AssignedModified
now := time.Now()
err = store.fs.Chtimes(store.metaPath(assignedFile), now, now)
if err != nil {
t.Fatalf("could not change timestamps, error: %v", err)
}
// for now we hope that the system won't truncate the time to a less precise unit,
// if this test fails on certain systems that may be the reason.
modTime, err := store.AssignedModified()
if err != nil {
t.Fatalf("unable to determine modification time of assigned config source, error: %v", err)
}
if !now.Equal(modTime) {
t.Errorf("expect %q but got %q", now.Format(time.RFC3339), modTime.Format(time.RFC3339))
}
}
func TestFsStoreAssigned(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
expect checkpoint.RemoteConfigSource
err string
}{
{"default source", nil, ""},
{"non-default source", source, ""},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// save the last known good source
saveTestSourceFile(t, store, assignedFile, c.expect)
// load last-known-good and compare to expected result
source, err := store.Assigned()
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreLastKnownGood(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
expect checkpoint.RemoteConfigSource
err string
}{
{"default source", nil, ""},
{"non-default source", source, ""},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// save the last known good source
saveTestSourceFile(t, store, lastKnownGoodFile, c.expect)
// load last-known-good and compare to expected result
source, err := store.LastKnownGood()
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreSetAssigned(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
source = s
}
// save the assigned source
err = store.SetAssigned(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, assignedFile)
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
func TestFsStoreSetLastKnownGood(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
source = s
}
// save the assigned source
err = store.SetLastKnownGood(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, lastKnownGoodFile)
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
func TestFsStoreReset(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "other-name",
Namespace: "namespace",
UID: "other-uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
assigned checkpoint.RemoteConfigSource
lastKnownGood checkpoint.RemoteConfigSource
updated bool
}{
{"nil -> nil", nil, nil, false},
{"source -> nil", source, nil, true},
{"nil -> source", nil, source, false},
{"source -> source", source, source, true},
{"source -> otherSource", source, otherSource, true},
{"otherSource -> source", otherSource, source, true},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// manually save the sources to their respective files
saveTestSourceFile(t, store, assignedFile, c.assigned)
saveTestSourceFile(t, store, lastKnownGoodFile, c.lastKnownGood)
// reset
updated, err := store.Reset()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// make sure the files were emptied
if size := testSourceFileSize(t, store, assignedFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, assignedFile, size)
}
if size := testSourceFileSize(t, store, lastKnownGoodFile); size > 0 {
t.Errorf("case %q, expect source file %q to be empty but got %d bytes", c.desc, lastKnownGoodFile, size)
}
// make sure Assigned() and LastKnownGood() both return nil
assigned, err := store.Assigned()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
lastKnownGood, err := store.LastKnownGood()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if assigned != nil || lastKnownGood != nil {
t.Errorf("case %q, expect nil for assigned and last-known-good checkpoints, but still have %q and %q, respectively",
c.desc, assigned, lastKnownGood)
}
if c.updated != updated {
t.Errorf("case %q, expect reset to return %t, but got %t", c.desc, c.updated, updated)
}
})
}
}
func mapFromCheckpoint(store *fsStore, uid, resourceVersion string) (map[string]string, error) {
files, err := store.fs.ReadDir(store.checkpointPath(uid, resourceVersion))
if err != nil {
return nil, err
}
m := map[string]string{}
for _, f := range files {
// expect no subdirs, only regular files
if !f.Mode().IsRegular() {
return nil, fmt.Errorf("expect only regular files in checkpoint dir %q", uid)
}
// read the file contents and build the map
data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid, resourceVersion), f.Name()))
if err != nil {
return nil, err
}
m[f.Name()] = string(data)
}
return m, nil
}
func readTestSourceFile(t *testing.T, store *fsStore, relPath string) []byte {
data, err := store.fs.ReadFile(store.metaPath(relPath))
if err != nil {
t.Fatalf("unable to read test source file, error: %v", err)
}
return data
}
func saveTestSourceFile(t *testing.T, store *fsStore, relPath string, source checkpoint.RemoteConfigSource) {
if source != nil {
data, err := source.Encode()
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
err = utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), data)
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
} else {
err := utilfiles.ReplaceFile(store.fs, store.metaPath(relPath), []byte{})
if err != nil {
t.Fatalf("unable to save test source file, error: %v", err)
}
}
}
func testSourceFileSize(t *testing.T, store *fsStore, relPath string) int64 {
info, err := store.fs.Stat(store.metaPath(relPath))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
return info.Size()
}
// newKubeletConfiguration will create a new KubeletConfiguration with default values set
func newKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
s, _, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
versioned := &v1beta1.KubeletConfiguration{}
s.Default(versioned)
config := &kubeletconfig.KubeletConfiguration{}
if err := s.Convert(versioned, config, nil); err != nil {
return nil, err
}
return config, nil
}

View File

@@ -0,0 +1,70 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
// Store saves checkpoints and information about which is the assigned and last-known-good checkpoint to a storage layer
type Store interface {
// Initialize sets up the storage layer
Initialize() error
// Exists returns true if the object referenced by `source` has been checkpointed.
// The source must be unambiguous - e.g. if referencing an API object it must specify both uid and resourceVersion.
Exists(source checkpoint.RemoteConfigSource) (bool, error)
// Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration.
// The following payload types are supported:
// - k8s.io/api/core/v1.ConfigMap
Save(c checkpoint.Payload) error
// Load loads the KubeletConfiguration from the checkpoint referenced by `source`.
Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error)
// AssignedModified returns the last time that the assigned checkpoint was set
AssignedModified() (time.Time, error)
// Assigned returns the source that points to the checkpoint currently assigned to the Kubelet, or nil if no assigned checkpoint is set
Assigned() (checkpoint.RemoteConfigSource, error)
// LastKnownGood returns the source that points to the last-known-good checkpoint, or nil if no last-known-good checkpoint is set
LastKnownGood() (checkpoint.RemoteConfigSource, error)
// SetAssigned saves the source that points to the assigned checkpoint, set to nil to unset
SetAssigned(source checkpoint.RemoteConfigSource) error
// SetLastKnownGood saves the source that points to the last-known-good checkpoint, set to nil to unset
SetLastKnownGood(source checkpoint.RemoteConfigSource) error
// Reset unsets the assigned and last-known-good checkpoints and returns whether the assigned checkpoint was unset as a result of the reset
Reset() (bool, error)
}
// reset is a helper for implementing Reset, which can be implemented in terms of Store methods
func reset(s Store) (bool, error) {
assigned, err := s.Assigned()
if err != nil {
return false, err
}
if err := s.SetLastKnownGood(nil); err != nil {
return false, fmt.Errorf("failed to reset last-known-good UID in checkpoint store, error: %v", err)
}
if err := s.SetAssigned(nil); err != nil {
return false, fmt.Errorf("failed to reset assigned UID in checkpoint store, error: %v", err)
}
return assigned != nil, nil
}

View File

@@ -0,0 +1,71 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
func TestReset(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "other-name",
Namespace: "namespace",
UID: "other-uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
s *fakeStore
updated bool
}{
{&fakeStore{assigned: nil, lastKnownGood: nil}, false},
{&fakeStore{assigned: source, lastKnownGood: nil}, true},
{&fakeStore{assigned: nil, lastKnownGood: source}, false},
{&fakeStore{assigned: source, lastKnownGood: source}, true},
{&fakeStore{assigned: source, lastKnownGood: otherSource}, true},
{&fakeStore{assigned: otherSource, lastKnownGood: source}, true},
}
for _, c := range cases {
updated, err := reset(c.s)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if c.s.assigned != nil || c.s.lastKnownGood != nil {
t.Errorf("case %q, expect nil for assigned and last-known-good checkpoints, but still have %q and %q, respectively",
spew.Sdump(c.s), c.s.assigned, c.s.lastKnownGood)
}
if c.updated != updated {
t.Errorf("case %q, expect reset to return %t, but got %t", spew.Sdump(c.s), c.updated, updated)
}
}
}

View File

@@ -0,0 +1,48 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["configfiles.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles",
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["configfiles_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
],
)

View File

@@ -0,0 +1,90 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configfiles
import (
"fmt"
"path/filepath"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
// Loader loads configuration from a storage layer
type Loader interface {
// Load loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be loaded
Load() (*kubeletconfig.KubeletConfiguration, error)
}
// fsLoader loads configuration from `configDir`
type fsLoader struct {
// fs is the filesystem where the config files exist; can be mocked for testing
fs utilfs.Filesystem
// kubeletCodecs is the scheme used to decode config files
kubeletCodecs *serializer.CodecFactory
// kubeletFile is an absolute path to the file containing a serialized KubeletConfiguration
kubeletFile string
}
// NewFsLoader returns a Loader that loads a KubeletConfiguration from the `kubeletFile`
func NewFsLoader(fs utilfs.Filesystem, kubeletFile string) (Loader, error) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
return &fsLoader{
fs: fs,
kubeletCodecs: kubeletCodecs,
kubeletFile: kubeletFile,
}, nil
}
func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
data, err := loader.fs.ReadFile(loader.kubeletFile)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config file %q, error: %v", loader.kubeletFile, err)
}
// no configuration is an error, some parameters are required
if len(data) == 0 {
return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
}
kc, err := utilcodec.DecodeKubeletConfiguration(loader.kubeletCodecs, data)
if err != nil {
return nil, err
}
// make all paths absolute
resolveRelativePaths(kubeletconfig.KubeletConfigurationPathRefs(kc), filepath.Dir(loader.kubeletFile))
return kc, nil
}
// resolveRelativePaths makes relative paths absolute by resolving them against `root`
func resolveRelativePaths(paths []*string, root string) {
for _, path := range paths {
// leave empty paths alone, "no path" is a valid input
// do not attempt to resolve paths that are already absolute
if len(*path) > 0 && !filepath.IsAbs(*path) {
*path = filepath.Join(root, *path)
}
}
}

View File

@@ -0,0 +1,214 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configfiles
import (
"fmt"
"path/filepath"
"testing"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const configDir = "/test-config-dir"
const relativePath = "relative/path/test"
const kubeletFile = "kubelet"
func TestLoad(t *testing.T) {
cases := []struct {
desc string
file *string
expect *kubeletconfig.KubeletConfiguration
err string
}{
// missing file
{
"missing file",
nil,
nil,
"failed to read",
},
// empty file
{
"empty file",
newString(``),
nil,
"was empty",
},
// invalid format
{
"invalid yaml",
newString(`*`),
nil,
"failed to decode",
},
{
"invalid json",
newString(`{*`),
nil,
"failed to decode",
},
// invalid object
{
"missing kind",
newString(`{"apiVersion":"kubelet.config.k8s.io/v1beta1"}`),
nil,
"failed to decode",
},
{
"missing version",
newString(`{"kind":"KubeletConfiguration"}`),
nil,
"failed to decode",
},
{
"unregistered kind",
newString(`{"kind":"BogusKind","apiVersion":"kubelet.config.k8s.io/v1beta1"}`),
nil,
"failed to decode",
},
{
"unregistered version",
newString(`{"kind":"KubeletConfiguration","apiVersion":"bogusversion"}`),
nil,
"failed to decode",
},
// empty object with correct kind and version should result in the defaults for that kind and version
{
"default from yaml",
newString(`kind: KubeletConfiguration
apiVersion: kubelet.config.k8s.io/v1beta1`),
newConfig(t),
"",
},
{
"default from json",
newString(`{"kind":"KubeletConfiguration","apiVersion":"kubelet.config.k8s.io/v1beta1"}`),
newConfig(t),
"",
},
// relative path
{
"yaml, relative path is resolved",
newString(fmt.Sprintf(`kind: KubeletConfiguration
apiVersion: kubelet.config.k8s.io/v1beta1
staticPodPath: %s`, relativePath)),
func() *kubeletconfig.KubeletConfiguration {
kc := newConfig(t)
kc.StaticPodPath = filepath.Join(configDir, relativePath)
return kc
}(),
"",
},
{
"json, relative path is resolved",
newString(fmt.Sprintf(`{"kind":"KubeletConfiguration","apiVersion":"kubelet.config.k8s.io/v1beta1","staticPodPath":"%s"}`, relativePath)),
func() *kubeletconfig.KubeletConfiguration {
kc := newConfig(t)
kc.StaticPodPath = filepath.Join(configDir, relativePath)
return kc
}(),
"",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
fs := utilfs.NewFakeFs()
path := filepath.Join(configDir, kubeletFile)
if c.file != nil {
if err := addFile(fs, path, *c.file); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
loader, err := NewFsLoader(fs, path)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
kc, err := loader.Load()
if utiltest.SkipRest(t, c.desc, err, c.err) {
return
}
if !apiequality.Semantic.DeepEqual(c.expect, kc) {
t.Fatalf("expect %#v but got %#v", *c.expect, *kc)
}
})
}
}
func TestResolveRelativePaths(t *testing.T) {
absolutePath := filepath.Join(configDir, "absolute")
cases := []struct {
desc string
path string
expect string
}{
{"empty path", "", ""},
{"absolute path", absolutePath, absolutePath},
{"relative path", relativePath, filepath.Join(configDir, relativePath)},
}
paths := kubeletconfig.KubeletConfigurationPathRefs(newConfig(t))
if len(paths) == 0 {
t.Fatalf("requires at least one path field to exist in the KubeletConfiguration type")
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// set the path, resolve it, and check if it resolved as we would expect
*(paths[0]) = c.path
resolveRelativePaths(paths, configDir)
if *(paths[0]) != c.expect {
t.Fatalf("expect %s but got %s", c.expect, *(paths[0]))
}
})
}
}
func newString(s string) *string {
return &s
}
func addFile(fs utilfs.Filesystem, path string, file string) error {
if err := utilfiles.EnsureDir(fs, filepath.Dir(path)); err != nil {
return err
}
return utilfiles.ReplaceFile(fs, path, []byte(file))
}
func newConfig(t *testing.T) *kubeletconfig.KubeletConfiguration {
kubeletScheme, _, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// get the built-in default configuration
external := &kubeletconfigv1beta1.KubeletConfiguration{}
kubeletScheme.Default(external)
kc := &kubeletconfig.KubeletConfiguration{}
err = kubeletScheme.Convert(external, kc, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
return kc
}

View File

@@ -0,0 +1,265 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletconfig
import (
"fmt"
"os"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
const (
// KubeletConfigChangedEventReason identifies an event as a change of Kubelet configuration
KubeletConfigChangedEventReason = "KubeletConfigChanged"
// LocalEventMessage is sent when the Kubelet restarts to use local config
LocalEventMessage = "Kubelet restarting to use local config"
// RemoteEventMessageFmt is sent when the Kubelet restarts to use a remote config
RemoteEventMessageFmt = "Kubelet restarting to use %s, UID: %s, ResourceVersion: %s, KubeletConfigKey: %s"
)
// pokeConfiSourceWorker tells the worker thread that syncs config sources that work needs to be done
func (cc *Controller) pokeConfigSourceWorker() {
select {
case cc.pendingConfigSource <- true:
default:
}
}
// syncConfigSource checks if work needs to be done to use a new configuration, and does that work if necessary
func (cc *Controller) syncConfigSource(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) {
select {
case <-cc.pendingConfigSource:
default:
// no work to be done, return
return
}
// if the sync fails, we want to retry
var syncerr error
defer func() {
if syncerr != nil {
utillog.Errorf(syncerr.Error())
cc.pokeConfigSourceWorker()
}
}()
// get the latest Node.Spec.ConfigSource from the informer
source, err := latestNodeConfigSource(cc.nodeInformer.GetStore(), nodeName)
if err != nil {
cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, status.InternalError))
syncerr = fmt.Errorf("%s, error: %v", status.InternalError, err)
return
}
// a nil source simply means we reset to local defaults
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset assigned and last-known-good to defaults")
if updated, reason, err := cc.resetConfig(); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
restartForNewConfig(eventClient, nodeName, nil)
}
return
}
// a non-nil source means we should attempt to download the config, and checkpoint it if necessary
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
// TODO(mtaufen): It would be nice if we could check the payload's metadata before (re)downloading the whole payload
// we at least try pulling the latest configmap out of the local informer store.
// construct the interface that can dynamically dispatch the correct Download, etc. methods for the given source type
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// "download" source, either from informer's in-memory store or directly from the API server, if the informer doesn't have a copy
payload, reason, err := cc.downloadConfigPayload(client, remote)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// save a checkpoint for the payload, if one does not already exist
if reason, err := cc.saveConfigCheckpoint(remote, payload); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// update the local, persistent record of assigned config
if updated, reason, err := cc.setAssignedConfig(remote); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
restartForNewConfig(eventClient, nodeName, remote)
}
// If we get here:
// - there is no need to restart to use new config
// - there was no error trying to sync configuration
// - if, previously, there was an error trying to sync configuration, we need to clear that error from the status
cc.configStatus.SetErrorOverride("")
}
// Note: source has up-to-date uid and resourceVersion after calling downloadConfigPayload.
func (cc *Controller) downloadConfigPayload(client clientset.Interface, source checkpoint.RemoteConfigSource) (checkpoint.Payload, string, error) {
var store cache.Store
if cc.remoteConfigSourceInformer != nil {
store = cc.remoteConfigSourceInformer.GetStore()
}
return source.Download(client, store)
}
func (cc *Controller) saveConfigCheckpoint(source checkpoint.RemoteConfigSource, payload checkpoint.Payload) (string, error) {
ok, err := cc.checkpointStore.Exists(source)
if err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
if ok {
utillog.Infof("checkpoint already exists for %s, UID: %s, ResourceVersion: %s", source.APIPath(), payload.UID(), payload.ResourceVersion())
return "", nil
}
if err := cc.checkpointStore.Save(payload); err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
return "", nil
}
// setAssignedConfig updates the assigned checkpoint config in the store.
// Returns whether the assigned config changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) setAssignedConfig(source checkpoint.RemoteConfigSource) (bool, string, error) {
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return false, status.InternalError, err
}
if err := cc.checkpointStore.SetAssigned(source); err != nil {
return false, status.InternalError, err
}
return !checkpoint.EqualRemoteConfigSources(assigned, source), "", nil
}
// resetConfig resets the assigned and last-known-good checkpoints in the checkpoint store to their default values and
// returns whether the assigned checkpoint changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) resetConfig() (bool, string, error) {
updated, err := cc.checkpointStore.Reset()
if err != nil {
return false, status.InternalError, err
}
return updated, "", nil
}
// restartForNewConfig presumes the Kubelet is managed by a babysitter, e.g. systemd
// It will send an event before exiting.
func restartForNewConfig(eventClient v1core.EventsGetter, nodeName string, source checkpoint.RemoteConfigSource) {
message := LocalEventMessage
if source != nil {
message = fmt.Sprintf(RemoteEventMessageFmt, source.APIPath(), source.UID(), source.ResourceVersion(), source.KubeletFilename())
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := makeEvent(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, message)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
}
utillog.Infof(message)
os.Exit(0)
}
// latestNodeConfigSource returns a copy of the most recent NodeConfigSource from the Node with `nodeName` in `store`
func latestNodeConfigSource(store cache.Store, nodeName string) (*apiv1.NodeConfigSource, error) {
obj, ok, err := store.GetByKey(nodeName)
if err != nil {
err := fmt.Errorf("failed to retrieve Node %q from informer's store, error: %v", nodeName, err)
utillog.Errorf(err.Error())
return nil, err
} else if !ok {
err := fmt.Errorf("Node %q does not exist in the informer's store, can't sync config source", nodeName)
utillog.Errorf(err.Error())
return nil, err
}
node, ok := obj.(*apiv1.Node)
if !ok {
err := fmt.Errorf("failed to cast object from informer's store to Node, can't sync config source for Node %q", nodeName)
utillog.Errorf(err.Error())
return nil, err
}
// Copy the source, so anyone who modifies it after here doesn't mess up the informer's store!
// This was previously the cause of a bug that made the Kubelet frequently resync config; Download updated
// the UID and ResourceVersion on the NodeConfigSource, but the pointer was still drilling all the way
// into the informer's copy!
return node.Spec.ConfigSource.DeepCopy(), nil
}
// makeEvent constructs an event
// similar to makeEvent in k8s.io/client-go/tools/record/event.go
func makeEvent(nodeName, eventtype, reason, message string) *apiv1.Event {
const componentKubelet = "kubelet"
// NOTE(mtaufen): This is consistent with pkg/kubelet/kubelet.go. Even though setting the node
// name as the UID looks strange, it appears to be conventional for events sent by the Kubelet.
ref := apiv1.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
t := metav1.Time{Time: time.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &apiv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
},
InvolvedObject: ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
Source: apiv1.EventSource{Component: componentKubelet, Host: string(nodeName)},
}
}

View File

@@ -0,0 +1,324 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletconfig
import (
"fmt"
"path/filepath"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
utilpanic "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/panic"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
storeDir = "store"
// TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default,
// because it is not especially clear where this should live in the API.
configTrialDuration = 10 * time.Minute
)
// TransformFunc edits the KubeletConfiguration in-place, and returns an
// error if any of the transformations failed.
type TransformFunc func(kc *kubeletconfig.KubeletConfiguration) error
// Controller manages syncing dynamic Kubelet configurations
// For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/dynamic-kubelet-configuration.md
type Controller struct {
// transform applies an arbitrary transformation to config after loading, and before validation.
// This can be used, for example, to include config from flags before the controller's validation step.
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
// Be wary if using this function as an extension point, in most cases the controller should
// probably just be natively extended to do what you need. Injecting flag precedence transformations
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
// controller's tree (pkg/) is not.
transform TransformFunc
// pendingConfigSource; write to this channel to indicate that the config source needs to be synced from the API server
pendingConfigSource chan bool
// configStatus manages the status we report on the Node object
configStatus status.NodeConfigStatus
// nodeInformer is the informer that watches the Node object
nodeInformer cache.SharedInformer
// remoteConfigSourceInformer is the informer that watches the assigned config source
remoteConfigSourceInformer cache.SharedInformer
// checkpointStore persists config source checkpoints to a storage layer
checkpointStore store.Store
}
// NewController constructs a new Controller object and returns it. The dynamicConfigDir
// path must be absolute. transform applies an arbitrary transformation to config after loading, and before validation.
// This can be used, for example, to include config from flags before the controller's validation step.
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
// Be wary if using this function as an extension point, in most cases the controller should
// probably just be natively extended to do what you need. Injecting flag precedence transformations
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
// controller's tree (pkg/) is not.
func NewController(dynamicConfigDir string, transform TransformFunc) *Controller {
return &Controller{
transform: transform,
// channels must have capacity at least 1, since we signal with non-blocking writes
pendingConfigSource: make(chan bool, 1),
configStatus: status.NewNodeConfigStatus(),
checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)),
}
}
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
utillog.Infof("starting controller")
// ensure the filesystem is initialized
if err := cc.initializeDynamicConfigDir(); err != nil {
return nil, err
}
// determine assigned source and set status
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return nil, err
}
if assignedSource != nil {
cc.configStatus.SetAssigned(assignedSource.NodeConfigSource())
}
// determine last-known-good source and set status
lastKnownGoodSource, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return nil, err
}
if lastKnownGoodSource != nil {
cc.configStatus.SetLastKnownGood(lastKnownGoodSource.NodeConfigSource())
}
// if the assigned source is nil, return nil to indicate local config
if assignedSource == nil {
return nil, nil
}
// attempt to load assigned config
assignedConfig, reason, err := cc.loadConfig(assignedSource)
if err == nil {
// update the active source to the non-nil assigned source
cc.configStatus.SetActive(assignedSource.NodeConfigSource())
// update the last-known-good config if necessary, and start a timer that
// periodically checks whether the last-known good needs to be updated
// we only do this when the assigned config loads and passes validation
// wait.Forever will call the func once before starting the timer
go wait.Forever(func() { cc.checkTrial(configTrialDuration) }, 10*time.Second)
return assignedConfig, nil
} // Assert: the assigned config failed to load or validate
// TODO(mtaufen): consider re-attempting download when a load/verify/parse/validate
// error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
// or something else scary
// log the reason and error details for the failure to load the assigned config
utillog.Errorf(fmt.Sprintf("%s, error: %v", reason, err))
// set status to indicate the failure with the assigned config
cc.configStatus.SetError(reason)
// if the last-known-good source is nil, return nil to indicate local config
if lastKnownGoodSource == nil {
return nil, nil
}
// attempt to load the last-known-good config
lastKnownGoodConfig, _, err := cc.loadConfig(lastKnownGoodSource)
if err != nil {
// we failed to load the last-known-good, so something is really messed up and we just return the error
return nil, err
}
// set status to indicate the active source is the non-nil last-known-good source
cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
return lastKnownGoodConfig, nil
}
// StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
// The clients must be non-nil, and the nodeName must be non-empty.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
const errFmt = "cannot start Kubelet config sync: %s"
if client == nil {
return fmt.Errorf(errFmt, "nil client")
}
if eventClient == nil {
return fmt.Errorf(errFmt, "nil event client")
}
if nodeName == "" {
return fmt.Errorf(errFmt, "empty nodeName")
}
// Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
// we use HandlePanic to manually call the panic handlers and then crash.
// We have a better chance of recovering normal operation if we just restart the Kubelet in the event
// of a Go runtime error.
// NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
// This was EVIL to debug (difficult to see missing `()`).
// The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
// status sync worker
statusSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting status sync loop")
wait.JitterUntil(func() {
cc.configStatus.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})
// remote config source informer, if we have a remote source to watch
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return fmt.Errorf(errFmt, err)
} else if assignedSource == nil {
utillog.Infof("local source is assigned, will not start remote config source informer")
} else {
cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
AddFunc: cc.onAddRemoteConfigSourceEvent,
UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
},
)
}
remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
if cc.remoteConfigSourceInformer != nil {
utillog.Infof("starting remote config source informer")
cc.remoteConfigSourceInformer.Run(wait.NeverStop)
}
})
// node informer
cc.nodeInformer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
nodeInformerFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer")
cc.nodeInformer.Run(wait.NeverStop)
})
// config sync worker
configSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Kubelet config sync loop")
wait.JitterUntil(func() {
cc.syncConfigSource(client, eventClient, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})
go statusSyncLoopFunc()
go remoteConfigSourceInformerFunc()
go nodeInformerFunc()
go configSyncLoopFunc()
return nil
}
// loadConfig loads Kubelet config from a checkpoint
// It returns the loaded configuration or a clean failure reason (for status reporting) and an error.
func (cc *Controller) loadConfig(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, string, error) {
// load KubeletConfiguration from checkpoint
kc, err := cc.checkpointStore.Load(source)
if err != nil {
return nil, status.LoadError, err
}
// apply any required transformations to the KubeletConfiguration
if cc.transform != nil {
if err := cc.transform(kc); err != nil {
return nil, status.InternalError, err
}
}
// validate the result
if err := validation.ValidateKubeletConfiguration(kc); err != nil {
return nil, status.ValidateError, err
}
return kc, "", nil
}
// initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly
func (cc *Controller) initializeDynamicConfigDir() error {
utillog.Infof("ensuring filesystem is set up correctly")
// initializeDynamicConfigDir local checkpoint storage location
return cc.checkpointStore.Initialize()
}
// checkTrial checks whether the trial duration has passed, and updates the last-known-good config if necessary
func (cc *Controller) checkTrial(duration time.Duration) {
// when the trial period is over, the assigned config becomes the last-known-good
if trial, err := cc.inTrial(duration); err != nil {
utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
} else if !trial {
if err := cc.graduateAssignedToLastKnownGood(); err != nil {
utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
}
}
}
// inTrial returns true if the time elapsed since the last modification of the assigned config does not exceed `trialDur`, false otherwise
func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
now := time.Now()
t, err := cc.checkpointStore.AssignedModified()
if err != nil {
return false, err
}
if now.Sub(t) <= trialDur {
return true, nil
}
return false, nil
}
// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
// to the same value as the assigned config maintained by the checkpointStore
func (cc *Controller) graduateAssignedToLastKnownGood() error {
// get assigned
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return err
}
// get last-known-good
lastKnownGood, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return err
}
// if the sources are equal, no need to change
if assigned == lastKnownGood ||
assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned, lastKnownGood) {
return nil
}
// update last-known-good
err = cc.checkpointStore.SetLastKnownGood(assigned)
if err != nil {
return err
}
// update the status to reflect the new last-known-good config
cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
return nil
}

View File

@@ -0,0 +1,34 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["status.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status",
deps = [
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,202 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
"fmt"
"sync"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
"k8s.io/kubernetes/pkg/kubelet/metrics"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
const (
// LoadError indicates that the Kubelet failed to load the config checkpoint
LoadError = "failed to load config, see Kubelet log for details"
// ValidateError indicates that the Kubelet failed to validate the config checkpoint
ValidateError = "failed to validate config, see Kubelet log for details"
// AllNilSubfieldsError is used when no subfields are set
// This could happen in the case that an old client tries to read an object from a newer API server with a set subfield it does not know about
AllNilSubfieldsError = "invalid NodeConfigSource, exactly one subfield must be non-nil, but all were nil"
// DownloadError is used when the download fails, e.g. due to network issues
DownloadError = "failed to download config, see Kubelet log for details"
// InternalError indicates that some internal error happened while trying to sync config, e.g. filesystem issues
InternalError = "internal failure, see Kubelet log for details"
// SyncErrorFmt is used when the system couldn't sync the config, due to a malformed Node.Spec.ConfigSource, a download failure, etc.
SyncErrorFmt = "failed to sync: %s"
)
// NodeConfigStatus represents Node.Status.Config
type NodeConfigStatus interface {
// SetActive sets the active source in the status
SetActive(source *apiv1.NodeConfigSource)
// SetAssigned sets the assigned source in the status
SetAssigned(source *apiv1.NodeConfigSource)
// SetLastKnownGood sets the last-known-good source in the status
SetLastKnownGood(source *apiv1.NodeConfigSource)
// SetError sets the error associated with the status
SetError(err string)
// SetErrorOverride sets an error that overrides the base error set by SetError.
// If the override is set to the empty string, the base error is reported in
// the status, otherwise the override is reported.
SetErrorOverride(err string)
// Sync patches the current status into the Node identified by `nodeName` if an update is pending
Sync(client clientset.Interface, nodeName string)
}
type nodeConfigStatus struct {
// status is the core NodeConfigStatus that we report
status apiv1.NodeConfigStatus
// mux is a mutex on the nodeConfigStatus, alternate between setting and syncing the status
mux sync.Mutex
// errorOverride is sent in place of the usual error if it is non-empty
errorOverride string
// syncCh; write to this channel to indicate that the status needs to be synced to the API server
syncCh chan bool
}
// NewNodeConfigStatus returns a new NodeConfigStatus interface
func NewNodeConfigStatus() NodeConfigStatus {
// channels must have capacity at least 1, since we signal with non-blocking writes
syncCh := make(chan bool, 1)
// prime new status managers to sync with the API server on the first call to Sync
syncCh <- true
return &nodeConfigStatus{
syncCh: syncCh,
}
}
// transact grabs the lock, performs the fn, records the need to sync, and releases the lock
func (s *nodeConfigStatus) transact(fn func()) {
s.mux.Lock()
defer s.mux.Unlock()
fn()
s.sync()
}
func (s *nodeConfigStatus) SetAssigned(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.Assigned = source
})
}
func (s *nodeConfigStatus) SetActive(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.Active = source
})
}
func (s *nodeConfigStatus) SetLastKnownGood(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.LastKnownGood = source
})
}
func (s *nodeConfigStatus) SetError(err string) {
s.transact(func() {
s.status.Error = err
})
}
func (s *nodeConfigStatus) SetErrorOverride(err string) {
s.transact(func() {
s.errorOverride = err
})
}
// sync notes that the status needs to be synced to the API server
func (s *nodeConfigStatus) sync() {
select {
case s.syncCh <- true:
default:
}
}
// Sync attempts to sync the status with the Node object for this Kubelet,
// if syncing fails, an error is logged, and work is queued for retry.
func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) {
select {
case <-s.syncCh:
default:
// no work to be done, return
return
}
utillog.Infof("updating Node.Status.Config")
// grab the lock
s.mux.Lock()
defer s.mux.Unlock()
// if the sync fails, we want to retry
var err error
defer func() {
if err != nil {
utillog.Errorf(err.Error())
s.sync()
}
}()
// get the Node so we can check the current status
oldNode, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("could not get Node %q, will not sync status, error: %v", nodeName, err)
return
}
status := &s.status
// override error, if necessary
if len(s.errorOverride) > 0 {
// copy the status, so we don't overwrite the prior error
// with the override
status = status.DeepCopy()
status.Error = s.errorOverride
}
// update metrics based on the status we will sync
metrics.SetConfigError(len(status.Error) > 0)
err = metrics.SetAssignedConfig(status.Assigned)
if err != nil {
err = fmt.Errorf("failed to update Assigned config metric, error: %v", err)
return
}
err = metrics.SetActiveConfig(status.Active)
if err != nil {
err = fmt.Errorf("failed to update Active config metric, error: %v", err)
return
}
err = metrics.SetLastKnownGoodConfig(status.LastKnownGood)
if err != nil {
err = fmt.Errorf("failed to update LastKnownGood config metric, error: %v", err)
return
}
// apply the status to a copy of the node so we don't modify the object in the informer's store
newNode := oldNode.DeepCopy()
newNode.Status.Config = status
// patch the node with the new status
if _, _, err := nodeutil.PatchNodeStatus(client.CoreV1(), types.NodeName(nodeName), oldNode, newNode); err != nil {
utillog.Errorf("failed to patch node status, error: %v", err)
}
}

View File

@@ -0,0 +1,34 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["codec.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,93 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package codec
import (
"fmt"
// ensure the core apis are installed
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
)
// EncodeKubeletConfig encodes an internal KubeletConfiguration to an external YAML representation
func EncodeKubeletConfig(internal *kubeletconfig.KubeletConfiguration, targetVersion schema.GroupVersion) ([]byte, error) {
encoder, err := NewKubeletconfigYAMLEncoder(targetVersion)
if err != nil {
return nil, err
}
// encoder will convert to external version
data, err := runtime.Encode(encoder, internal)
if err != nil {
return nil, err
}
return data, nil
}
// NewKubeletconfigYAMLEncoder returns an encoder that can write objects in the kubeletconfig API group to YAML
func NewKubeletconfigYAMLEncoder(targetVersion schema.GroupVersion) (runtime.Encoder, error) {
_, codecs, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
return codecs.EncoderForVersion(info.Serializer, targetVersion), nil
}
// NewYAMLEncoder generates a new runtime.Encoder that encodes objects to YAML
func NewYAMLEncoder(groupName string) (runtime.Encoder, error) {
// encode to YAML
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
versions := legacyscheme.Scheme.PrioritizedVersionsForGroup(groupName)
if len(versions) == 0 {
return nil, fmt.Errorf("no enabled versions for group %q", groupName)
}
// the "best" version supposedly comes first in the list returned from legacyscheme.Registry.EnabledVersionsForGroup
return legacyscheme.Codecs.EncoderForVersion(info.Serializer, versions[0]), nil
}
// DecodeKubeletConfiguration decodes a serialized KubeletConfiguration to the internal type
func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []byte) (*kubeletconfig.KubeletConfiguration, error) {
// the UniversalDecoder runs defaulting and returns the internal type by default
obj, gvk, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to decode, error: %v", err)
}
internalKC, ok := obj.(*kubeletconfig.KubeletConfiguration)
if !ok {
return nil, fmt.Errorf("failed to cast object to KubeletConfiguration, unexpected type: %v", gvk)
}
return internalKC, nil
}

View File

@@ -0,0 +1,26 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["equal.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/equal",
deps = ["//vendor/k8s.io/api/core/v1:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,24 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package equal
import apiv1 "k8s.io/api/core/v1"
// KubeletConfigOkEq returns true if the two conditions are semantically equivalent in the context of dynamic config
func KubeletConfigOkEq(a, b *apiv1.NodeCondition) bool {
return a.Message == b.Message && a.Reason == b.Reason && a.Status == b.Status
}

View File

@@ -0,0 +1,37 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["files.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files",
deps = ["//pkg/util/filesystem:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["files_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
],
)

View File

@@ -0,0 +1,229 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package files
import (
"fmt"
"os"
"path/filepath"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
defaultPerm = 0755
tmptag = "tmp_" // additional prefix to prevent accidental collisions
)
// FileExists returns true if a regular file exists at `path`, false if `path` does not exist, otherwise an error
func FileExists(fs utilfs.Filesystem, path string) (bool, error) {
if info, err := fs.Stat(path); err == nil {
if info.Mode().IsRegular() {
return true, nil
}
return false, fmt.Errorf("expected regular file at %q, but mode is %q", path, info.Mode().String())
} else if os.IsNotExist(err) {
return false, nil
} else {
return false, err
}
}
// EnsureFile ensures that a regular file exists at `path`, and if it must create the file any
// necessary parent directories will also be created and the new file will be empty.
func EnsureFile(fs utilfs.Filesystem, path string) error {
// if file exists, don't change it, but do report any unexpected errors
if ok, err := FileExists(fs, path); ok || err != nil {
return err
} // Assert: file does not exist
// create any necessary parents
err := fs.MkdirAll(filepath.Dir(path), defaultPerm)
if err != nil {
return err
}
// create the file
file, err := fs.Create(path)
if err != nil {
return err
}
// close the file, since we don't intend to use it yet
return file.Close()
}
// WriteTmpFile creates a temporary file at `path`, writes `data` into it, and fsyncs the file
// Expects the parent directory to exist.
func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath string, retErr error) {
dir := filepath.Dir(path)
prefix := tmptag + filepath.Base(path)
// create the tmp file
tmpFile, err := fs.TempFile(dir, prefix)
if err != nil {
return "", err
}
defer func() {
// close the file, return the close error only if there haven't been any other errors
if err := tmpFile.Close(); retErr == nil {
retErr = err
}
// if there was an error writing, syncing, or closing, delete the temporary file and return the error
if retErr != nil {
if err := fs.Remove(tmpPath); err != nil {
retErr = fmt.Errorf("attempted to remove temporary file %q after error %v, but failed due to error: %v", tmpPath, retErr, err)
}
tmpPath = ""
}
}()
// Name() will be an absolute path when using utilfs.DefaultFS, because ioutil.TempFile passes
// an absolute path to os.Open, and we ensure similar behavior in utilfs.FakeFS for testing.
tmpPath = tmpFile.Name()
// write data
if _, err := tmpFile.Write(data); err != nil {
return tmpPath, err
}
// sync file, to ensure it's written in case a hard reset happens
return tmpPath, tmpFile.Sync()
}
// ReplaceFile replaces the contents of the file at `path` with `data` by writing to a tmp file in the same
// dir as `path` and renaming the tmp file over `path`. The file does not have to exist to use ReplaceFile,
// but the parent directory must exist.
// Note ReplaceFile calls fsync.
func ReplaceFile(fs utilfs.Filesystem, path string, data []byte) error {
// write data to a temporary file
tmpPath, err := WriteTmpFile(fs, path, data)
if err != nil {
return err
}
// rename over existing file
return fs.Rename(tmpPath, path)
}
// DirExists returns true if a directory exists at `path`, false if `path` does not exist, otherwise an error
func DirExists(fs utilfs.Filesystem, path string) (bool, error) {
if info, err := fs.Stat(path); err == nil {
if info.IsDir() {
return true, nil
}
return false, fmt.Errorf("expected dir at %q, but mode is %q", path, info.Mode().String())
} else if os.IsNotExist(err) {
return false, nil
} else {
return false, err
}
}
// EnsureDir ensures that a directory exists at `path`, and if it must create the directory any
// necessary parent directories will also be created and the new directory will be empty.
func EnsureDir(fs utilfs.Filesystem, path string) error {
// if dir exists, don't change it, but do report any unexpected errors
if ok, err := DirExists(fs, path); ok || err != nil {
return err
} // Assert: dir does not exist
// create the dir
return fs.MkdirAll(path, defaultPerm)
}
// WriteTempDir creates a temporary dir at `path`, writes `files` into it, and fsyncs all the files
// The keys of `files` represent file names. These names must not:
// - be empty
// - be a path that contains more than the base name of a file (e.g. foo/bar is invalid, as is /bar)
// - match `.` or `..` exactly
// - be longer than 255 characters
// The above validation rules are based on atomic_writer.go, though in this case are more restrictive
// because we only allow a flat hierarchy.
func WriteTempDir(fs utilfs.Filesystem, path string, files map[string]string) (tmpPath string, retErr error) {
// validate the filename keys; for now we only allow a flat keyset
for name := range files {
// invalidate empty names
if name == "" {
return "", fmt.Errorf("invalid file key: must not be empty: %q", name)
}
// invalidate: foo/bar and /bar
if name != filepath.Base(name) {
return "", fmt.Errorf("invalid file key %q, only base names are allowed", name)
}
// invalidate `.` and `..`
if name == "." || name == ".." {
return "", fmt.Errorf("invalid file key, may not be '.' or '..'")
}
// invalidate length > 255 characters
if len(name) > 255 {
return "", fmt.Errorf("invalid file key %q, must be less than 255 characters", name)
}
}
// write the temp directory in parent dir and return path to the tmp directory
dir := filepath.Dir(path)
prefix := tmptag + filepath.Base(path)
// create the tmp dir
var err error
tmpPath, err = fs.TempDir(dir, prefix)
if err != nil {
return "", err
}
// be sure to clean up if there was an error
defer func() {
if retErr != nil {
if err := fs.RemoveAll(tmpPath); err != nil {
retErr = fmt.Errorf("attempted to remove temporary directory %q after error %v, but failed due to error: %v", tmpPath, retErr, err)
}
}
}()
// write data
for name, data := range files {
// create the file
file, err := fs.Create(filepath.Join(tmpPath, name))
if err != nil {
return tmpPath, err
}
// be sure to close the file when we're done
defer func() {
// close the file when we're done, don't overwrite primary retErr if close fails
if err := file.Close(); retErr == nil {
retErr = err
}
}()
// write the file
if _, err := file.Write([]byte(data)); err != nil {
return tmpPath, err
}
// sync the file, to ensure it's written in case a hard reset happens
if err := file.Sync(); err != nil {
return tmpPath, err
}
}
return tmpPath, nil
}
// ReplaceDir replaces the contents of the dir at `path` with `files` by writing to a tmp dir in the same
// dir as `path` and renaming the tmp dir over `path`. The dir does not have to exist to use ReplaceDir.
func ReplaceDir(fs utilfs.Filesystem, path string, files map[string]string) error {
// write data to a temporary directory
tmpPath, err := WriteTempDir(fs, path, files)
if err != nil {
return err
}
// rename over target directory
return fs.Rename(tmpPath, path)
}

View File

@@ -0,0 +1,476 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package files
import (
"fmt"
"os"
"path/filepath"
"testing"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
prefix = "test-util-files"
)
type file struct {
name string
// mode distinguishes file type,
// we only check for regular vs. directory in these tests,
// specify regular as 0, directory as os.ModeDir
mode os.FileMode
data string // ignored if mode == os.ModeDir
}
func (f *file) write(fs utilfs.Filesystem, dir string) error {
path := filepath.Join(dir, f.name)
if f.mode.IsDir() {
if err := fs.MkdirAll(path, defaultPerm); err != nil {
return err
}
} else if f.mode.IsRegular() {
// create parent directories, if necessary
parents := filepath.Dir(path)
if err := fs.MkdirAll(parents, defaultPerm); err != nil {
return err
}
// create the file
handle, err := fs.Create(path)
if err != nil {
return err
}
_, err = handle.Write([]byte(f.data))
if err != nil {
if cerr := handle.Close(); cerr != nil {
return fmt.Errorf("error %v closing file after error: %v", cerr, err)
}
return err
}
} else {
return fmt.Errorf("mode not implemented for testing %s", f.mode.String())
}
return nil
}
func (f *file) expect(fs utilfs.Filesystem, dir string) error {
path := filepath.Join(dir, f.name)
if f.mode.IsDir() {
info, err := fs.Stat(path)
if err != nil {
return err
}
if !info.IsDir() {
return fmt.Errorf("expected directory, got mode %s", info.Mode().String())
}
} else if f.mode.IsRegular() {
info, err := fs.Stat(path)
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return fmt.Errorf("expected regular file, got mode %s", info.Mode().String())
}
data, err := fs.ReadFile(path)
if err != nil {
return err
}
if f.data != string(data) {
return fmt.Errorf("expected file data %q, got %q", f.data, string(data))
}
} else {
return fmt.Errorf("mode not implemented for testing %s", f.mode.String())
}
return nil
}
// write files, perform some function, then attempt to read files back
// if err is non-empty, expects an error from the function performed in the test
// and skips reading back the expected files
type test struct {
desc string
writes []file
expects []file
fn func(fs utilfs.Filesystem, dir string, c *test) []error
err string
}
func (c *test) write(t *testing.T, fs utilfs.Filesystem, dir string) {
for _, f := range c.writes {
if err := f.write(fs, dir); err != nil {
t.Fatalf("error pre-writing file: %v", err)
}
}
}
// you can optionally skip calling t.Errorf by passing a nil t, and process the
// returned errors instead
func (c *test) expect(t *testing.T, fs utilfs.Filesystem, dir string) []error {
errs := []error{}
for _, f := range c.expects {
if err := f.expect(fs, dir); err != nil {
msg := fmt.Errorf("expect %#v, got error: %v", f, err)
errs = append(errs, msg)
if t != nil {
t.Errorf("%s", msg)
}
}
}
return errs
}
// run a test case, with an arbitrary function to execute between write and expect
// if c.fn is nil, errors from c.expect are checked against c.err, instead of errors
// from fn being checked against c.err
func (c *test) run(t *testing.T, fs utilfs.Filesystem) {
// isolate each test case in a new temporary directory
dir, err := fs.TempDir("", prefix)
if err != nil {
t.Fatalf("error creating temporary directory for test: %v", err)
}
c.write(t, fs, dir)
// if fn exists, check errors from fn, then check expected files
if c.fn != nil {
errs := c.fn(fs, dir, c)
if len(errs) > 0 {
for _, err := range errs {
utiltest.ExpectError(t, err, c.err)
}
// skip checking expected files if we expected errors
// (usually means we didn't create file)
return
}
c.expect(t, fs, dir)
return
}
// just check expected files, and compare errors from c.expect to c.err
// (this lets us test the helper functions above)
errs := c.expect(nil, fs, dir)
for _, err := range errs {
utiltest.ExpectError(t, err, c.err)
}
}
// simple test of the above helper functions
func TestHelpers(t *testing.T) {
// omitting the test.fn means test.err is compared to errors from test.expect
cases := []test{
{
desc: "regular file",
writes: []file{{name: "foo", data: "bar"}},
expects: []file{{name: "foo", data: "bar"}},
},
{
desc: "directory",
writes: []file{{name: "foo", mode: os.ModeDir}},
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
desc: "deep regular file",
writes: []file{{name: "foo/bar", data: "baz"}},
expects: []file{{name: "foo/bar", data: "baz"}},
},
{
desc: "deep directory",
writes: []file{{name: "foo/bar", mode: os.ModeDir}},
expects: []file{{name: "foo/bar", mode: os.ModeDir}},
},
{
desc: "missing file",
expects: []file{{name: "foo", data: "bar"}},
err: "no such file or directory",
},
{
desc: "missing directory",
expects: []file{{name: "foo/bar", mode: os.ModeDir}},
err: "no such file or directory",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestFileExists(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
ok, err := FileExists(fs, filepath.Join(dir, "foo"))
if err != nil {
return []error{err}
}
if !ok {
return []error{fmt.Errorf("does not exist (test)")}
}
return nil
}
cases := []test{
{
fn: fn,
desc: "file exists",
writes: []file{{name: "foo"}},
},
{
fn: fn,
desc: "file does not exist",
err: "does not exist (test)",
},
{
fn: fn,
desc: "object has non-file mode",
writes: []file{{name: "foo", mode: os.ModeDir}},
err: "expected regular file",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestEnsureFile(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
var errs []error
for _, f := range c.expects {
if err := EnsureFile(fs, filepath.Join(dir, f.name)); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "file exists",
writes: []file{{name: "foo"}},
expects: []file{{name: "foo"}},
},
{
fn: fn,
desc: "file does not exist",
expects: []file{{name: "bar"}},
},
{
fn: fn,
desc: "neither parent nor file exists",
expects: []file{{name: "baz/quux"}},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
// Note: This transitively tests WriteTmpFile
func TestReplaceFile(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
var errs []error
for _, f := range c.expects {
if err := ReplaceFile(fs, filepath.Join(dir, f.name), []byte(f.data)); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "file exists",
writes: []file{{name: "foo"}},
expects: []file{{name: "foo", data: "bar"}},
},
{
fn: fn,
desc: "file does not exist",
expects: []file{{name: "foo", data: "bar"}},
},
{
fn: func(fs utilfs.Filesystem, dir string, c *test) []error {
if err := ReplaceFile(fs, filepath.Join(dir, "foo/bar"), []byte("")); err != nil {
return []error{err}
}
return nil
},
desc: "neither parent nor file exists",
err: "no such file or directory",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestDirExists(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
ok, err := DirExists(fs, filepath.Join(dir, "foo"))
if err != nil {
return []error{err}
}
if !ok {
return []error{fmt.Errorf("does not exist (test)")}
}
return nil
}
cases := []test{
{
fn: fn,
desc: "dir exists",
writes: []file{{name: "foo", mode: os.ModeDir}},
},
{
fn: fn,
desc: "dir does not exist",
err: "does not exist (test)",
},
{
fn: fn,
desc: "object has non-dir mode",
writes: []file{{name: "foo"}},
err: "expected dir",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestEnsureDir(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
var errs []error
for _, f := range c.expects {
if err := EnsureDir(fs, filepath.Join(dir, f.name)); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "dir exists",
writes: []file{{name: "foo", mode: os.ModeDir}},
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
fn: fn,
desc: "dir does not exist",
expects: []file{{name: "bar", mode: os.ModeDir}},
},
{
fn: fn,
desc: "neither parent nor dir exists",
expects: []file{{name: "baz/quux", mode: os.ModeDir}},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestWriteTempDir(t *testing.T) {
// writing a tmp dir is covered by TestReplaceDir, but we additionally test filename validation here
c := test{
desc: "invalid file key",
err: "invalid file key",
fn: func(fs utilfs.Filesystem, dir string, c *test) []error {
if _, err := WriteTempDir(fs, filepath.Join(dir, "tmpdir"), map[string]string{"foo/bar": ""}); err != nil {
return []error{err}
}
return nil
},
}
c.run(t, utilfs.DefaultFs{})
}
func TestReplaceDir(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
errs := []error{}
// compute filesets from expected files and call ReplaceDir for each
// we don't nest dirs in test cases, order of ReplaceDir call is not guaranteed
dirs := map[string]map[string]string{}
// allocate dirs
for _, f := range c.expects {
if f.mode.IsDir() {
path := filepath.Join(dir, f.name)
if _, ok := dirs[path]; !ok {
dirs[path] = map[string]string{}
}
} else if f.mode.IsRegular() {
path := filepath.Join(dir, filepath.Dir(f.name))
if _, ok := dirs[path]; !ok {
// require an expectation for the parent directory if there is an expectation for the file
errs = append(errs, fmt.Errorf("no prior parent directory in c.expects for file %s", f.name))
continue
}
dirs[path][filepath.Base(f.name)] = f.data
}
}
// short-circuit test case validation errors
if len(errs) > 0 {
return errs
}
// call ReplaceDir for each desired dir
for path, files := range dirs {
if err := ReplaceDir(fs, path, files); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "fn catches invalid test case",
expects: []file{{name: "foo/bar"}},
err: "no prior parent directory",
},
{
fn: fn,
desc: "empty dir",
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
fn: fn,
desc: "dir with files",
expects: []file{
{name: "foo", mode: os.ModeDir},
{name: "foo/bar", data: "baz"},
{name: "foo/baz", data: "bar"},
},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}

View File

@@ -0,0 +1,26 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["log.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log",
deps = ["//vendor/github.com/golang/glog:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,49 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package log
import (
"fmt"
"github.com/golang/glog"
)
const logFmt = "kubelet config controller: %s"
// Errorf shim that inserts "kubelet config controller" at the beginning of the log message,
// while still reporting the call site of the logging function.
func Errorf(format string, args ...interface{}) {
var s string
if len(args) > 0 {
s = fmt.Sprintf(format, args...)
} else {
s = format
}
glog.ErrorDepth(1, fmt.Sprintf(logFmt, s))
}
// Infof shim that inserts "kubelet config controller" at the beginning of the log message,
// while still reporting the call site of the logging function.
func Infof(format string, args ...interface{}) {
var s string
if len(args) > 0 {
s = fmt.Sprintf(format, args...)
} else {
s = format
}
glog.InfoDepth(1, fmt.Sprintf(logFmt, s))
}

View File

@@ -0,0 +1,26 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["panic.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/panic",
deps = ["//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,36 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package panic
import utilruntime "k8s.io/apimachinery/pkg/util/runtime"
// HandlePanic returns a function that wraps `fn` with the utilruntime.PanicHandlers, and continues
// to bubble the panic after the PanicHandlers are called
func HandlePanic(fn func()) func() {
return func() {
defer func() {
if r := recover(); r != nil {
for _, fn := range utilruntime.PanicHandlers {
fn(r)
}
panic(r)
}
}()
// call the function
fn()
}
}

View File

@@ -0,0 +1,25 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["test.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test",
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -0,0 +1,55 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package test
import (
"strings"
"testing"
)
// ExpectError calls t.Fatalf if the error does not contain a substr match.
// If substr is empty, a nil error is expected.
// It is useful to call ExpectError from subtests.
func ExpectError(t *testing.T, err error, substr string) {
if err != nil {
if len(substr) == 0 {
t.Fatalf("expect nil error but got %q", err.Error())
} else if !strings.Contains(err.Error(), substr) {
t.Fatalf("expect error to contain %q but got %q", substr, err.Error())
}
} else if len(substr) > 0 {
t.Fatalf("expect error to contain %q but got nil error", substr)
}
}
// SkipRest returns true if there was a non-nil error or if we expected an error that didn't happen,
// and logs the appropriate error on the test object.
// The return value indicates whether we should skip the rest of the test case due to the error result.
func SkipRest(t *testing.T, desc string, err error, contains string) bool {
if err != nil {
if len(contains) == 0 {
t.Errorf("case %q, expect nil error but got %q", desc, err.Error())
} else if !strings.Contains(err.Error(), contains) {
t.Errorf("case %q, expect error to contain %q but got %q", desc, contains, err.Error())
}
return true
} else if len(contains) > 0 {
t.Errorf("case %q, expect error to contain %q but got nil error", desc, contains)
return true
}
return false
}

View File

@@ -0,0 +1,154 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeletconfig
import (
"math/rand"
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
// newSharedNodeInformer returns a shared informer that uses `client` to watch the Node with
// `nodeName` for changes and respond with `addFunc`, `updateFunc`, and `deleteFunc`.
func newSharedNodeInformer(client clientset.Interface, nodeName string,
addFunc func(newObj interface{}),
updateFunc func(oldObj interface{}, newObj interface{}),
deleteFunc func(deletedObj interface{})) cache.SharedInformer {
// select nodes by name
fieldselector := fields.OneTermEqualSelector("metadata.name", nodeName)
// add some randomness to resync period, which can help avoid controllers falling into lock-step
minResyncPeriod := 15 * time.Minute
factor := rand.Float64() + 1
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (kuberuntime.Object, error) {
return client.CoreV1().Nodes().List(metav1.ListOptions{
FieldSelector: fieldselector.String(),
})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Nodes().Watch(metav1.ListOptions{
FieldSelector: fieldselector.String(),
ResourceVersion: options.ResourceVersion,
})
},
}
handler := cache.ResourceEventHandlerFuncs{
AddFunc: addFunc,
UpdateFunc: updateFunc,
DeleteFunc: deleteFunc,
}
informer := cache.NewSharedInformer(lw, &apiv1.Node{}, resyncPeriod)
informer.AddEventHandler(handler)
return informer
}
// onAddNodeEvent calls onUpdateNodeEvent with the new object and a nil old object
func (cc *Controller) onAddNodeEvent(newObj interface{}) {
cc.onUpdateNodeEvent(nil, newObj)
}
// onUpdateNodeEvent checks whether the configSource changed between oldObj and newObj, and pokes the
// configuration sync worker if there was a change
func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{}) {
newNode, ok := newObj.(*apiv1.Node)
if !ok {
utillog.Errorf("failed to cast new object to Node, couldn't handle event")
return
}
if oldObj == nil {
// Node was just added, need to sync
utillog.Infof("initial Node watch event")
cc.pokeConfigSourceWorker()
return
}
oldNode, ok := oldObj.(*apiv1.Node)
if !ok {
utillog.Errorf("failed to cast old object to Node, couldn't handle event")
return
}
if !apiequality.Semantic.DeepEqual(oldNode.Spec.ConfigSource, newNode.Spec.ConfigSource) {
utillog.Infof("Node.Spec.ConfigSource was updated")
cc.pokeConfigSourceWorker()
}
}
// onDeleteNodeEvent logs a message if the Node was deleted
// We allow the sync-loop to continue, because it is possible that the Kubelet detected
// a Node with unexpected externalID and is attempting to delete and re-create the Node
// (see pkg/kubelet/kubelet_node_status.go), or that someone accidentally deleted the Node
// (the Kubelet will re-create it).
func (cc *Controller) onDeleteNodeEvent(deletedObj interface{}) {
// For this case, we just log the event.
// We don't want to poke the worker, because a temporary deletion isn't worth reporting an error for.
// If the Node is deleted because the VM is being deleted, then the Kubelet has nothing to do.
utillog.Infof("Node was deleted")
}
// onAddRemoteConfigSourceEvent calls onUpdateConfigMapEvent with the new object and a nil old object
func (cc *Controller) onAddRemoteConfigSourceEvent(newObj interface{}) {
cc.onUpdateRemoteConfigSourceEvent(nil, newObj)
}
// onUpdateRemoteConfigSourceEvent checks whether the configSource changed between oldObj and newObj,
// and pokes the sync worker if there was a change
func (cc *Controller) onUpdateRemoteConfigSourceEvent(oldObj interface{}, newObj interface{}) {
// since ConfigMap is currently the only source type, we handle that here
newConfigMap, ok := newObj.(*apiv1.ConfigMap)
if !ok {
utillog.Errorf("failed to cast new object to ConfigMap, couldn't handle event")
return
}
if oldObj == nil {
// ConfigMap was just added, need to sync
utillog.Infof("initial ConfigMap watch event")
cc.pokeConfigSourceWorker()
return
}
oldConfigMap, ok := oldObj.(*apiv1.ConfigMap)
if !ok {
utillog.Errorf("failed to cast old object to ConfigMap, couldn't handle event")
return
}
if !apiequality.Semantic.DeepEqual(oldConfigMap, newConfigMap) {
utillog.Infof("assigned ConfigMap was updated")
cc.pokeConfigSourceWorker()
}
}
// onDeleteRemoteConfigSourceEvent logs a message if the ConfigMap was deleted and pokes the sync worker
func (cc *Controller) onDeleteRemoteConfigSourceEvent(deletedObj interface{}) {
// If the ConfigMap we're watching is deleted, we log the event and poke the sync worker.
// This requires a sync, because if the Node is still configured to use the deleted ConfigMap,
// the Kubelet should report a DownloadError.
utillog.Infof("assigned ConfigMap was deleted")
cc.pokeConfigSourceWorker()
}