Merge pull request #7 from xing-yang/snapshot_controller

Add Snapshot Controller
This commit is contained in:
k8s-ci-robot
2018-08-29 23:41:51 -07:00
committed by GitHub
9 changed files with 2310 additions and 10 deletions

View File

@@ -0,0 +1,122 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"reflect"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
)
const (
// SnapshotPVCAnnotation is "snapshot.alpha.kubernetes.io/snapshot"
SnapshotPVCAnnotation = "volumesnapshot.csi.k8s.io/snapshot"
)
// NewClient creates a new RESTClient
func NewClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) {
scheme := runtime.NewScheme()
if err := crdv1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
config := *cfg
config.GroupVersion = &crdv1.SchemeGroupVersion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, nil, err
}
return client, scheme, nil
}
// CreateCRD creates CustomResourceDefinition
func CreateCRD(clientset apiextensionsclient.Interface) error {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotClassResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotClassResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshotClass{}).Name(),
},
},
}
res, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
res, err)
}
crd = &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotContentResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotContentResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshotContent{}).Name(),
},
},
}
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotContentResource: %#v, err: %#v",
res, err)
}
crd = &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.NamespaceScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshot{}).Name(),
},
},
}
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
res, err)
}
return nil
}

View File

@@ -1,7 +1,200 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main package main
import "fmt" import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"time"
"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
"github.com/kubernetes-csi/external-snapshotter/pkg/controller"
clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
)
const (
// Number of worker threads
threads = 10
// Default timeout of short CSI calls like GetPluginInfo
csiTimeout = time.Second
)
// Command line flags
var (
snapshotter = flag.String("snapshotter", "", "Name of the snapshotter. The snapshotter will only create snapshot content for snapshot that requests a VolumeSnapshotClass with a snapshotter field set equal to this name.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.")
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.")
)
func main() { func main() {
fmt.Println("vim-go") flag.Set("logtostderr", "true")
flag.Parse()
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
snapClient, err := clientset.NewForConfig(config)
if err != nil {
glog.Errorf("Error building snapshot clientset: %s", err.Error())
os.Exit(1)
}
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
// Create CRD resource
aeclientset, err := apiextensionsclient.NewForConfig(config)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// initialize CRD resource if it does not exist
err = CreateCRD(aeclientset)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// Connect to CSI.
csiConn, err := connection.New(*csiAddress, *connectionTimeout)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// Pass a context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Find driver name
if *snapshotter == "" {
*snapshotter, err = csiConn.GetDriverName(ctx)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
}
glog.V(2).Infof("CSI driver name: %q", *snapshotter)
// Check it's ready
if err = waitForDriverReady(csiConn, *connectionTimeout); err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// Find out if the driver supports create/delete snapshot.
supportsCreateSnapshot, err := csiConn.SupportsControllerCreateSnapshot(ctx)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
if !supportsCreateSnapshot {
glog.Errorf("CSI driver %s does not support ControllerCreateSnapshot", *snapshotter)
os.Exit(1)
}
if len(*snapshotNamePrefix) == 0 {
glog.Error("Snapshot name prefix cannot be of length 0")
os.Exit(1)
}
glog.V(2).Infof("Start NewCSISnapshotController with snapshotter [%s] kubeconfig [%s] connectionTimeout [%+v] csiAddress [%s] createSnapshotContentRetryCount [%d] createSnapshotContentInterval [%+v] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", *snapshotter, *kubeconfig, *connectionTimeout, *csiAddress, createSnapshotContentRetryCount, *createSnapshotContentInterval, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength)
ctrl := controller.NewCSISnapshotController(
snapClient,
kubeClient,
*snapshotter,
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
*createSnapshotContentRetryCount,
*createSnapshotContentInterval,
csiConn,
*connectionTimeout,
*resyncPeriod,
*snapshotNamePrefix,
*snapshotNameUUIDLength,
)
// run...
stopCh := make(chan struct{})
factory.Start(stopCh)
go ctrl.Run(threads, stopCh)
// ...until SIGINT
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
close(stopCh)
}
func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}
func waitForDriverReady(csiConn connection.CSIConnection, timeout time.Duration) error {
now := time.Now()
finish := now.Add(timeout)
var err error
for {
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
err = csiConn.Probe(ctx)
if err == nil {
glog.V(2).Infof("Probe succeeded")
return nil
}
glog.V(2).Infof("Probe failed with %s", err)
now := time.Now()
if now.After(finish) {
return fmt.Errorf("failed to probe the controller: %s", err)
}
time.Sleep(time.Second)
}
} }

View File

@@ -19,6 +19,7 @@ package v1alpha1
import ( import (
core_v1 "k8s.io/api/core/v1" core_v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1" storage "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
@@ -79,7 +80,7 @@ type VolumeSnapshotSpec struct {
// Name of the VolumeSnapshotClass used by the VolumeSnapshot. If not specified, a default snapshot class will // Name of the VolumeSnapshotClass used by the VolumeSnapshot. If not specified, a default snapshot class will
// be used if it is available. // be used if it is available.
// +optional // +optional
VolumeSnapshotClassName string `json:"snapshotClassName" protobuf:"bytes,3,opt,name=snapshotClassName"` VolumeSnapshotClassName *string `json:"snapshotClassName" protobuf:"bytes,3,opt,name=snapshotClassName"`
} }
// VolumeSnapshotStatus is the status of the VolumeSnapshot // VolumeSnapshotStatus is the status of the VolumeSnapshot
@@ -89,18 +90,24 @@ type VolumeSnapshotStatus struct {
// +optional // +optional
CreationTime *metav1.Time `json:"createdAt" protobuf:"bytes,1,opt,name=createdAt"` CreationTime *metav1.Time `json:"createdAt" protobuf:"bytes,1,opt,name=createdAt"`
// When restoring volume from the snapshot, the volume size should be equal to or
// larger than the RestoreSize if it is specified. If RestoreSize is set to nil, it means
// that the storage plugin does not have this information available.
// +optional
RestoreSize *resource.Quantity `json:"restoreSize" protobuf:"bytes,2,opt,name=restoreSize"`
// Ready is set to true only if the snapshot is ready to use (e.g., finish uploading if // Ready is set to true only if the snapshot is ready to use (e.g., finish uploading if
// there is an uploading phase) and also VolumeSnapshot and its VolumeSnapshotContent // there is an uploading phase) and also VolumeSnapshot and its VolumeSnapshotContent
// bind correctly with each other. If any of the above condition is not true, Ready is // bind correctly with each other. If any of the above condition is not true, Ready is
// set to false // set to false
// +optional // +optional
Ready bool `json:"ready" protobuf:"varint,2,opt,name=ready"` Ready bool `json:"ready" protobuf:"varint,3,opt,name=ready"`
// The last error encountered during create snapshot operation, if any. // The last error encountered during create snapshot operation, if any.
// This field must only be set by the entity completing the create snapshot // This field must only be set by the entity completing the create snapshot
// operation, i.e. the external-snapshotter. // operation, i.e. the external-snapshotter.
// +optional // +optional
Error *storage.VolumeError Error *storage.VolumeError `json:"error,omitempty" protobuf:"bytes,4,opt,name=error,casttype=VolumeError"`
} }
// TypedLocalObjectReference contains enough information to let you locate the typed referenced object inside the same namespace. // TypedLocalObjectReference contains enough information to let you locate the typed referenced object inside the same namespace.
@@ -194,6 +201,11 @@ type VolumeSnapshotContentSpec struct {
// taken from. It becomes non-nil when VolumeSnapshot and VolumeSnapshotContent are bound. // taken from. It becomes non-nil when VolumeSnapshot and VolumeSnapshotContent are bound.
// +optional // +optional
PersistentVolumeRef *core_v1.ObjectReference `json:"persistentVolumeRef" protobuf:"bytes,3,opt,name=persistentVolumeRef"` PersistentVolumeRef *core_v1.ObjectReference `json:"persistentVolumeRef" protobuf:"bytes,3,opt,name=persistentVolumeRef"`
// Name of the VolumeSnapshotClass used by the VolumeSnapshot. If not specified, a default snapshot class will
// be used if it is available.
// +optional
VolumeSnapshotClassName *string `json:"snapshotClassName" protobuf:"bytes,4,opt,name=snapshotClassName"`
} }
// VolumeSnapshotSource represents the actual location and type of the snapshot. Only one of its members may be specified. // VolumeSnapshotSource represents the actual location and type of the snapshot. Only one of its members may be specified.
@@ -206,19 +218,28 @@ type VolumeSnapshotSource struct {
// Represents the source from CSI volume snapshot // Represents the source from CSI volume snapshot
type CSIVolumeSnapshotSource struct { type CSIVolumeSnapshotSource struct {
// Driver is the name of the driver to use for this snapshot. // Driver is the name of the driver to use for this snapshot.
// This MUST be the same name returned by the CSI GetPluginName() call for
// that driver.
// Required. // Required.
Driver string `json:"driver"` Driver string `json:"driver" protobuf:"bytes,1,opt,name=driver"`
// SnapshotHandle is the unique snapshot id returned by the CSI volume // SnapshotHandle is the unique snapshot id returned by the CSI volume
// plugins CreateSnapshot to refer to the snapshot on all subsequent calls. // plugins CreateSnapshot to refer to the snapshot on all subsequent calls.
// Required. // Required.
SnapshotHandle string `json:"snapshotHandle"` SnapshotHandle string `json:"snapshotHandle" protobuf:"bytes,2,opt,name=snapshotHandle"`
// Timestamp when the point-in-time snapshot is taken on the storage // Timestamp when the point-in-time snapshot is taken on the storage
// system. This timestamp will be generated by the CSI volume driver after // system. This timestamp will be generated by the CSI volume driver after
// the snapshot is cut. The format of this field should be a Unix nanoseconds // the snapshot is cut. The format of this field should be a Unix nanoseconds
// time encoded as an int64. On Unix, the command `date +%s%N` returns // time encoded as an int64. On Unix, the command `date +%s%N` returns
// the current time in nanoseconds since 1970-01-01 00:00:00 UTC. // the current time in nanoseconds since 1970-01-01 00:00:00 UTC.
// This field is REQUIRED. // This field is required in the CSI spec but optional here to support static binding.
CreatedAt int64 `json:"createdAt,omitempty" protobuf:"varint,3,opt,name=createdAt"` // +optional
CreationTime *int64 `json:"creationTime,omitempty" protobuf:"varint,3,opt,name=creationTime"`
// When restoring volume from the snapshot, the volume size should be equal to or
// larger than the RestoreSize if it is specified. If RestoreSize is set to nil, it means
// that the storage plugin does not have this information available.
// +optional
RestoreSize *resource.Quantity `json:"restoreSize" protobuf:"bytes,4,opt,name=restoreSize"`
} }

View File

@@ -29,6 +29,16 @@ import (
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CSIVolumeSnapshotSource) DeepCopyInto(out *CSIVolumeSnapshotSource) { func (in *CSIVolumeSnapshotSource) DeepCopyInto(out *CSIVolumeSnapshotSource) {
*out = *in *out = *in
if in.CreationTime != nil {
in, out := &in.CreationTime, &out.CreationTime
*out = new(int64)
**out = **in
}
if in.RestoreSize != nil {
in, out := &in.RestoreSize, &out.RestoreSize
x := (*in).DeepCopy()
*out = &x
}
return return
} }
@@ -226,6 +236,11 @@ func (in *VolumeSnapshotContentSpec) DeepCopyInto(out *VolumeSnapshotContentSpec
*out = new(v1.ObjectReference) *out = new(v1.ObjectReference)
**out = **in **out = **in
} }
if in.VolumeSnapshotClassName != nil {
in, out := &in.VolumeSnapshotClassName, &out.VolumeSnapshotClassName
*out = new(string)
**out = **in
}
return return
} }
@@ -278,7 +293,7 @@ func (in *VolumeSnapshotSource) DeepCopyInto(out *VolumeSnapshotSource) {
if in.CSI != nil { if in.CSI != nil {
in, out := &in.CSI, &out.CSI in, out := &in.CSI, &out.CSI
*out = new(CSIVolumeSnapshotSource) *out = new(CSIVolumeSnapshotSource)
**out = **in (*in).DeepCopyInto(*out)
} }
return return
} }
@@ -301,6 +316,11 @@ func (in *VolumeSnapshotSpec) DeepCopyInto(out *VolumeSnapshotSpec) {
*out = new(TypedLocalObjectReference) *out = new(TypedLocalObjectReference)
**out = **in **out = **in
} }
if in.VolumeSnapshotClassName != nil {
in, out := &in.VolumeSnapshotClassName, &out.VolumeSnapshotClassName
*out = new(string)
**out = **in
}
return return
} }
@@ -321,6 +341,11 @@ func (in *VolumeSnapshotStatus) DeepCopyInto(out *VolumeSnapshotStatus) {
in, out := &in.CreationTime, &out.CreationTime in, out := &in.CreationTime, &out.CreationTime
*out = (*in).DeepCopy() *out = (*in).DeepCopy()
} }
if in.RestoreSize != nil {
in, out := &in.RestoreSize, &out.RestoreSize
x := (*in).DeepCopy()
*out = &x
}
if in.Error != nil { if in.Error != nil {
in, out := &in.Error, &out.Error in, out := &in.Error, &out.Error
*out = new(v1beta1.VolumeError) *out = new(v1beta1.VolumeError)

View File

@@ -0,0 +1,264 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package connection
import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"k8s.io/api/core/v1"
)
// CSIConnection is gRPC connection to a remote CSI driver and abstracts all
// CSI calls.
type CSIConnection interface {
// GetDriverName returns driver name as discovered by GetPluginInfo()
// gRPC call.
GetDriverName(ctx context.Context) (string, error)
// SupportsControllerCreateSnapshot returns true if the CSI driver reports
// CREATE_DELETE_SNAPSHOT in ControllerGetCapabilities() gRPC call.
SupportsControllerCreateSnapshot(ctx context.Context) (bool, error)
// SupportsControllerListSnapshots returns true if the CSI driver reports
// LIST_SNAPSHOTS in ControllerGetCapabilities() gRPC call.
SupportsControllerListSnapshots(ctx context.Context) (bool, error)
// CreateSnapshot creates a snapshot for a volume
CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp int64, size int64, status *csi.SnapshotStatus, err error)
// DeleteSnapshot deletes a snapshot from a volume
DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error)
// GetSnapshotStatus lists snapshot from a volume
GetSnapshotStatus(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, int64, error)
// Probe checks that the CSI driver is ready to process requests
Probe(ctx context.Context) error
// Close the connection
Close() error
}
type csiConnection struct {
conn *grpc.ClientConn
}
var (
_ CSIConnection = &csiConnection{}
)
func New(address string, timeout time.Duration) (CSIConnection, error) {
conn, err := connect(address, timeout)
if err != nil {
return nil, err
}
return &csiConnection{
conn: conn,
}, nil
}
func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
glog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
glog.V(4).Infof("Connection timed out")
// subsequent GetPluginInfo will show the real connection error
return conn, nil
}
if conn.GetState() == connectivity.Ready {
glog.V(3).Infof("Connected")
return conn, nil
}
glog.V(4).Infof("Still trying, connection is %s", conn.GetState())
}
}
func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
client := csi.NewIdentityClient(c.conn)
req := csi.GetPluginInfoRequest{}
rsp, err := client.GetPluginInfo(ctx, &req)
if err != nil {
return "", err
}
name := rsp.GetName()
if name == "" {
return "", fmt.Errorf("name is empty")
}
return name, nil
}
func (c *csiConnection) Probe(ctx context.Context) error {
client := csi.NewIdentityClient(c.conn)
req := csi.ProbeRequest{}
_, err := client.Probe(ctx, &req)
if err != nil {
return err
}
return nil
}
func (c *csiConnection) SupportsControllerCreateSnapshot(ctx context.Context) (bool, error) {
client := csi.NewControllerClient(c.conn)
req := csi.ControllerGetCapabilitiesRequest{}
rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return false, err
}
caps := rsp.GetCapabilities()
for _, cap := range caps {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
if rpc.GetType() == csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT {
return true, nil
}
}
return false, nil
}
func (c *csiConnection) SupportsControllerListSnapshots(ctx context.Context) (bool, error) {
client := csi.NewControllerClient(c.conn)
req := csi.ControllerGetCapabilitiesRequest{}
rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return false, err
}
caps := rsp.GetCapabilities()
for _, cap := range caps {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
if rpc.GetType() == csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS {
return true, nil
}
}
return false, nil
}
func (c *csiConnection) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, *csi.SnapshotStatus, error) {
glog.V(5).Infof("CSI CreateSnapshot: %s", snapshotName)
if volume.Spec.CSI == nil {
return "", "", 0, 0, nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
}
client := csi.NewControllerClient(c.conn)
driverName, err := c.GetDriverName(ctx)
if err != nil {
return "", "", 0, 0, nil, err
}
req := csi.CreateSnapshotRequest{
SourceVolumeId: volume.Spec.CSI.VolumeHandle,
Name: snapshotName,
Parameters: parameters,
CreateSnapshotSecrets: snapshotterCredentials,
}
rsp, err := client.CreateSnapshot(ctx, &req)
if err != nil {
return "", "", 0, 0, nil, err
}
glog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] status [%s]", snapshotName, driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, rsp.Snapshot.SizeBytes, *rsp.Snapshot.Status)
return driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, rsp.Snapshot.SizeBytes, rsp.Snapshot.Status, nil
}
func (c *csiConnection) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error) {
client := csi.NewControllerClient(c.conn)
req := csi.DeleteSnapshotRequest{
SnapshotId: snapshotID,
DeleteSnapshotSecrets: snapshotterCredentials,
}
if _, err := client.DeleteSnapshot(ctx, &req); err != nil {
return err
}
return nil
}
func (c *csiConnection) GetSnapshotStatus(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, int64, error) {
client := csi.NewControllerClient(c.conn)
req := csi.ListSnapshotsRequest{
SnapshotId: snapshotID,
}
rsp, err := client.ListSnapshots(ctx, &req)
if err != nil {
return nil, 0, err
}
if rsp.Entries == nil || len(rsp.Entries) == 0 {
return nil, 0, fmt.Errorf("can not find snapshot for snapshotID %s", snapshotID)
}
return rsp.Entries[0].Snapshot.Status, rsp.Entries[0].Snapshot.CreatedAt, nil
}
func (c *csiConnection) Close() error {
return c.conn.Close()
}
func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
glog.V(5).Infof("GRPC call: %s", method)
err := invoker(ctx, method, req, reply, cc, opts...)
glog.V(5).Infof("GRPC response: %+v", reply)
glog.V(5).Infof("GRPC error: %v", err)
return err
}

View File

@@ -0,0 +1,112 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
"k8s.io/api/core/v1"
)
// Handler is responsible for handling VolumeSnapshot events from informer.
type Handler interface {
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, *csi.SnapshotStatus, error)
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (*csi.SnapshotStatus, int64, error)
}
// csiHandler is a handler that calls CSI to create/delete volume snapshot.
type csiHandler struct {
csiConnection connection.CSIConnection
timeout time.Duration
snapshotNamePrefix string
snapshotNameUUIDLength int
}
func NewCSIHandler(
csiConnection connection.CSIConnection,
timeout time.Duration,
snapshotNamePrefix string,
snapshotNameUUIDLength int,
) Handler {
return &csiHandler{
csiConnection: csiConnection,
timeout: timeout,
snapshotNamePrefix: snapshotNamePrefix,
snapshotNameUUIDLength: snapshotNameUUIDLength,
}
}
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, *csi.SnapshotStatus, error) {
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength)
if err != nil {
return "", "", 0, 0, nil, err
}
return handler.csiConnection.CreateSnapshot(ctx, snapshotName, volume, parameters, snapshotterCredentials)
}
func (handler *csiHandler) DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error {
if content.Spec.CSI == nil {
return fmt.Errorf("CSISnapshot not defined in spec")
}
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
err := handler.csiConnection.DeleteSnapshot(ctx, content.Spec.CSI.SnapshotHandle, snapshotterCredentials)
if err != nil {
return fmt.Errorf("failed to delete snapshot data %s: %q", content.Name, err)
}
return nil
}
func (handler *csiHandler) GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (*csi.SnapshotStatus, int64, error) {
if content.Spec.CSI == nil {
return nil, 0, fmt.Errorf("CSISnapshot not defined in spec")
}
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
csiSnapshotStatus, timestamp, err := handler.csiConnection.GetSnapshotStatus(ctx, content.Spec.CSI.SnapshotHandle)
if err != nil {
return nil, 0, fmt.Errorf("failed to list snapshot data %s: %q", content.Name, err)
}
return csiSnapshotStatus, timestamp, nil
}
func makeSnapshotName(prefix, snapshotUID string, snapshotNameUUIDLength int) (string, error) {
// create persistent name based on a volumeNamePrefix and volumeNameUUIDLength
// of PVC's UID
if len(snapshotUID) == 0 {
return "", fmt.Errorf("Corrupted snapshot object, it is missing UID")
}
if snapshotNameUUIDLength == -1 {
// Default behavior is to not truncate or remove dashes
return fmt.Sprintf("%s-%s", prefix, snapshotUID), nil
}
return fmt.Sprintf("%s-%s", prefix, strings.Replace(snapshotUID, "-", "", -1)[0:snapshotNameUUIDLength]), nil
}

View File

@@ -0,0 +1,855 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storage "k8s.io/api/storage/v1beta1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
)
// ==================================================================
// PLEASE DO NOT ATTEMPT TO SIMPLIFY THIS CODE.
// KEEP THE SPACE SHUTTLE FLYING.
// ==================================================================
// Design:
//
// The fundamental key to this design is the bi-directional "pointer" between
// VolumeSnapshots and VolumeSnapshotContents, which is represented here
// as snapshot.Spec.SnapshotContentName and content.Spec.VolumeSnapshotRef.
// The bi-directionality is complicated to manage in a transactionless system, but
// without it we can't ensure sane behavior in the face of different forms of
// trouble. For example, a rogue HA controller instance could end up racing
// and making multiple bindings that are indistinguishable, resulting in
// potential data loss.
//
// This controller is designed to work in active-passive high availability
// mode. It *could* work also in active-active HA mode, all the object
// transitions are designed to cope with this, however performance could be
// lower as these two active controllers will step on each other toes
// frequently.
//
// This controller supports both dynamic snapshot creation and pre-bound snapshot.
// In pre-bound mode, objects are created with pre-defined pointers: a VolumeSnapshot
// points to a specific VolumeSnapshotContent and the VolumeSnapshotContent also
// points back for this VolumeSnapshot.
//
// The dynamic snapshot creation is multi-step process: first controller triggers
// snapshot creation though csi volume plugin which should return a snapshot after
// it is created successfully (however, the snapshot might not be ready to use yet if
// there is an uploading phase). The creationTimestamp will be updated according to
// VolumeSnapshot, and then a VolumeSnapshotContent object is created to represent
// this snapshot. After that, the controller will keep checking the snapshot status
// though csi snapshot calls. When the snapshot is ready to use, the controller set
// the status "Bound" to true to indicate the snapshot is bound and ready to use.
// If the createtion failed for any reason, the Error status is set accordingly.
// In alpha version, the controller not retry to create the snapshot after it failed.
// In the future version, a retry policy will be added.
const pvcKind = "PersistentVolumeClaim"
const controllerUpdateFailMsg = "snapshot controller failed to update"
const IsDefaultSnapshotClassAnnotation = "snapshot.storage.kubernetes.io/is-default-class"
// syncContent deals with one key off the queue. It returns false when it's time to quit.
func (ctrl *csiSnapshotController) syncContent(content *crdv1.VolumeSnapshotContent) error {
glog.V(5).Infof("synchronizing VolumeSnapshotContent[%s]", content.Name)
// VolumeSnapshotContent is not bound to any VolumeSnapshot, this case rare and we just return err
if content.Spec.VolumeSnapshotRef == nil {
// content is not bound
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: VolumeSnapshotContent is not bound to any VolumeSnapshot", content.Name)
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotContentNotBound", "VolumeSnapshotContent is not bound to any VolumeSnapshot")
return fmt.Errorf("volumeSnapshotContent %s is not bound to any VolumeSnapshot", content.Name)
} else {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content is bound to snapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// The VolumeSnapshotContent is reserved for a VolumeSnapshot;
// that VolumeSnapshot has not yet been bound to this VolumeSnapshotContent; the VolumeSnapshot sync will handle it.
if content.Spec.VolumeSnapshotRef.UID == "" {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: VolumeSnapshotContent is pre-bound to VolumeSnapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
return nil
}
// Get the VolumeSnapshot by _name_
var snapshot *crdv1.VolumeSnapshot
snapshotName := snapshotRefKey(content.Spec.VolumeSnapshotRef)
obj, found, err := ctrl.snapshotStore.GetByKey(snapshotName)
if err != nil {
return err
}
if !found {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: snapshot %s not found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Fall through with snapshot = nil
} else {
var ok bool
snapshot, ok = obj.(*crdv1.VolumeSnapshot)
if !ok {
return fmt.Errorf("cannot convert object from snapshot cache to snapshot %q!?: %#v", content.Name, obj)
}
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: snapshot %s found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
}
if snapshot != nil && snapshot.UID != content.Spec.VolumeSnapshotRef.UID {
// The snapshot that the content was pointing to was deleted, and another
// with the same name created.
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content %s has different UID, the old one must have been deleted", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Treat the volume as bound to a missing claim.
snapshot = nil
}
if snapshot == nil {
ctrl.deleteSnapshotContent(content)
}
}
return nil
}
// syncSnapshot is the main controller method to decide what to do with a snapshot.
// It's invoked by appropriate cache.Controller callbacks when a snapshot is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it is split into syncUnreadySnapshot and syncReadySnapshot
func (ctrl *csiSnapshotController) syncSnapshot(snapshot *crdv1.VolumeSnapshot) error {
glog.V(5).Infof("synchonizing VolumeSnapshot[%s]: %s", snapshotKey(snapshot), getSnapshotStatusForLogging(snapshot))
if !snapshot.Status.Ready {
return ctrl.syncUnreadySnapshot(snapshot)
} else {
return ctrl.syncReadySnapshot(snapshot)
}
}
// syncReadySnapshot checks the snapshot which has been bound to snapshot content succesfully before.
// If there is any problem with the binding (e.g., snapshot points to a non-exist snapshot content), update the snapshot status and emit event.
func (ctrl *csiSnapshotController) syncReadySnapshot(snapshot *crdv1.VolumeSnapshot) error {
if snapshot.Spec.SnapshotContentName == "" {
if err := ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", "Bound snapshot has lost reference to VolumeSnapshotContent"); err != nil {
return err
}
return nil
}
obj, found, err := ctrl.contentStore.GetByKey(snapshot.Spec.SnapshotContentName)
if err != nil {
return err
}
if !found {
if err = ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotContentMissing", "VolumeSnapshotContent is missing"); err != nil {
return err
}
return nil
} else {
content, ok := obj.(*crdv1.VolumeSnapshotContent)
if !ok {
return fmt.Errorf("Cannot convert object from snapshot content store to VolumeSnapshotContent %q!?: %#v", snapshot.Spec.SnapshotContentName, obj)
}
glog.V(5).Infof("syncCompleteSnapshot[%s]: VolumeSnapshotContent %q found", snapshotKey(snapshot), content.Name)
if !IsSnapshotBound(snapshot, content) {
// snapshot is bound but content is not bound to snapshot correctly
if err = ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotMisbound", "VolumeSnapshotContent is not bound to the VolumeSnapshot correctly"); err != nil {
return err
}
return nil
}
// Snapshot is correctly bound.
return nil
}
}
// syncUnreadySnapshot is the main controller method to decide what to do with a snapshot which is not set to ready.
func (ctrl *csiSnapshotController) syncUnreadySnapshot(snapshot *crdv1.VolumeSnapshot) error {
uniqueSnapshotName := snapshotKey(snapshot)
glog.V(5).Infof("syncUnreadySnapshot %s", uniqueSnapshotName)
if snapshot.Spec.SnapshotContentName != "" {
contentObj, found, err := ctrl.contentStore.GetByKey(snapshot.Spec.SnapshotContentName)
if err != nil {
return err
}
if !found {
// snapshot is bound to a non-existing content.
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotContentMissing", "VolumeSnapshotContent is missing")
glog.V(4).Infof("synchronizing unready snapshot[%s]: snapshotcontent %q requested and not found, will try again next time", uniqueSnapshotName, snapshot.Spec.SnapshotContentName)
return fmt.Errorf("snapshot %s is bound to a non-existing content %s", uniqueSnapshotName, snapshot.Spec.SnapshotContentName)
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
return fmt.Errorf("expected volume snapshot content, got %+v", contentObj)
}
if err := ctrl.checkandBindSnapshotContent(snapshot, content); err != nil {
// snapshot is bound but content is not bound to snapshot correctly
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotBindFailed", fmt.Sprintf("Snapshot failed to bind VolumeSnapshotContent, %v", err))
return fmt.Errorf("snapshot %s is bound, but VolumeSnapshotContent %s is not bound to the VolumeSnapshot correctly, %v", uniqueSnapshotName, content.Name, err)
}
// snapshot is already bound correctly, check the status and update if it is ready.
glog.V(5).Infof("Check and update snapshot %s status", uniqueSnapshotName)
if err = ctrl.checkandUpdateSnapshotStatus(snapshot, content); err != nil {
return err
}
return nil
} else { // snapshot.Spec.SnapshotContentName == nil
if contentObj := ctrl.getMatchSnapshotContent(snapshot); contentObj != nil {
glog.V(5).Infof("Find VolumeSnapshotContent object %s for snapshot %s", contentObj.Name, uniqueSnapshotName)
newSnapshot, err := ctrl.bindandUpdateVolumeSnapshot(contentObj, snapshot)
if err != nil {
return err
}
glog.V(5).Infof("bindandUpdateVolumeSnapshot %v", newSnapshot)
return nil
} else if snapshot.Status.Error == nil || isControllerUpdateFailError(snapshot.Status.Error) { // Try to create snapshot if no error status is set
if err := ctrl.createSnapshot(snapshot); err != nil {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot with error %v", err))
return err
}
return nil
}
return nil
}
}
// getMatchSnapshotContent looks up VolumeSnapshotContent for a VolumeSnapshot named snapshotName
func (ctrl *csiSnapshotController) getMatchSnapshotContent(snapshot *crdv1.VolumeSnapshot) *crdv1.VolumeSnapshotContent {
var snapshotContentObj *crdv1.VolumeSnapshotContent
var found bool
objs := ctrl.contentStore.List()
for _, obj := range objs {
content := obj.(*crdv1.VolumeSnapshotContent)
if content.Spec.VolumeSnapshotRef != nil &&
content.Spec.VolumeSnapshotRef.Name == snapshot.Name &&
content.Spec.VolumeSnapshotRef.Namespace == snapshot.Namespace &&
content.Spec.VolumeSnapshotRef.UID == snapshot.UID &&
content.Spec.VolumeSnapshotClassName != nil && snapshot.Spec.VolumeSnapshotClassName != nil &&
*(content.Spec.VolumeSnapshotClassName) == *(snapshot.Spec.VolumeSnapshotClassName) {
found = true
snapshotContentObj = content
break
}
}
if !found {
glog.V(4).Infof("No VolumeSnapshotContent for VolumeSnapshot %s found", snapshotKey(snapshot))
return nil
}
return snapshotContentObj
}
// deleteSnapshotContent starts delete action.
func (ctrl *csiSnapshotController) deleteSnapshotContent(content *crdv1.VolumeSnapshotContent) {
operationName := fmt.Sprintf("delete-%s[%s]", content.Name, string(content.UID))
glog.V(5).Infof("Snapshotter is about to delete volume snapshot and the operation named %s", operationName)
ctrl.scheduleOperation(operationName, func() error {
return ctrl.deleteSnapshotContentOperation(content)
})
}
// scheduleOperation starts given asynchronous operation on given volume. It
// makes sure the operation is already not running.
func (ctrl *csiSnapshotController) scheduleOperation(operationName string, operation func() error) {
glog.V(5).Infof("scheduleOperation[%s]", operationName)
err := ctrl.runningOperations.Run(operationName, operation)
if err != nil {
switch {
case goroutinemap.IsAlreadyExists(err):
glog.V(4).Infof("operation %q is already running, skipping", operationName)
case exponentialbackoff.IsExponentialBackoff(err):
glog.V(4).Infof("operation %q postponed due to exponential backoff", operationName)
default:
glog.Errorf("error scheduling operation %q: %v", operationName, err)
}
}
}
func (ctrl *csiSnapshotController) storeSnapshotUpdate(snapshot interface{}) (bool, error) {
return storeObjectUpdate(ctrl.snapshotStore, snapshot, "snapshot")
}
func (ctrl *csiSnapshotController) storeContentUpdate(content interface{}) (bool, error) {
return storeObjectUpdate(ctrl.contentStore, content, "content")
}
func (ctrl *csiSnapshotController) storeClassUpdate(content interface{}) (bool, error) {
return storeObjectUpdate(ctrl.classStore, content, "class")
}
// createSnapshot starts new asynchronous operation to create snapshot
func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot) error {
glog.V(5).Infof("createSnapshot[%s]: started", snapshotKey(snapshot))
opName := fmt.Sprintf("create-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
ctrl.scheduleOperation(opName, func() error {
snapshotObj, err := ctrl.createSnapshotOperation(snapshot)
if err != nil {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
glog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)
return err
}
_, updateErr := ctrl.storeSnapshotUpdate(snapshotObj)
if updateErr != nil {
// We will get an "snapshot update" event soon, this is not a big error
glog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
}
return nil
})
return nil
}
func (ctrl *csiSnapshotController) checkandUpdateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) error {
glog.V(5).Infof("checkandUpdateSnapshotStatus[%s] started", snapshotKey(snapshot))
opName := fmt.Sprintf("check-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
ctrl.scheduleOperation(opName, func() error {
snapshotObj, err := ctrl.checkandUpdateSnapshotStatusOperation(snapshot, content)
if err != nil {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot: %v", err))
glog.Errorf("checkandUpdateSnapshotStatus [%s]: error occured %v", snapshotKey(snapshot), err)
return err
}
_, updateErr := ctrl.storeSnapshotUpdate(snapshotObj)
if updateErr != nil {
// We will get an "snapshot update" event soon, this is not a big error
glog.V(4).Infof("checkandUpdateSnapshotStatus [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
}
return nil
})
return nil
}
// updateSnapshotStatusWithEvent saves new snapshot.Status to API server and emits
// given event on the snapshot. It saves the status and emits the event only when
// the status has actually changed from the version saved in API server.
// Parameters:
// snapshot - snapshot to update
// eventtype, reason, message - event to send, see EventRecorder.Event()
func (ctrl *csiSnapshotController) updateSnapshotErrorStatusWithEvent(snapshot *crdv1.VolumeSnapshot, eventtype, reason, message string) error {
glog.V(5).Infof("updateSnapshotStatusWithEvent[%s]", snapshotKey(snapshot))
if snapshot.Status.Error != nil && snapshot.Status.Error.Message == message {
glog.V(4).Infof("updateSnapshotStatusWithEvent[%s]: the same error %v is already set", snapshot.Name, snapshot.Status.Error)
return nil
}
snapshotClone := snapshot.DeepCopy()
statusError := &storage.VolumeError{
Time: metav1.Time{
Time: time.Now(),
},
Message: message,
}
snapshotClone.Status.Error = statusError
snapshotClone.Status.Ready = false
newSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status failed %v", snapshotKey(snapshot), err)
return err
}
_, err = ctrl.storeSnapshotUpdate(newSnapshot)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status: cannot update internal cache %v", snapshotKey(snapshot), err)
return err
}
// Emit the event only when the status change happens
ctrl.eventRecorder.Event(newSnapshot, eventtype, reason, message)
return nil
}
// Stateless functions
func getSnapshotStatusForLogging(snapshot *crdv1.VolumeSnapshot) string {
return fmt.Sprintf("bound to: %q, Completed: %v", snapshot.Spec.SnapshotContentName, snapshot.Status.Ready)
}
func IsSnapshotBound(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) bool {
if content.Spec.VolumeSnapshotRef != nil && content.Spec.VolumeSnapshotRef.Name == snapshot.Name &&
content.Spec.VolumeSnapshotRef.UID == snapshot.UID {
return true
}
return false
}
// The function checks whether the volumeSnapshotRef in snapshot content matches the given snapshot. If match, it binds the content with the snapshot
func (ctrl *csiSnapshotController) checkandBindSnapshotContent(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) error {
if content.Spec.VolumeSnapshotRef == nil || content.Spec.VolumeSnapshotRef.Name != snapshot.Name {
return fmt.Errorf("Could not bind snapshot %s and content %s, the VolumeSnapshotRef does not match", snapshot.Name, content.Name)
} else if content.Spec.VolumeSnapshotRef.UID != "" && content.Spec.VolumeSnapshotRef.UID != snapshot.UID {
return fmt.Errorf("Could not bind snapshot %s and content %s, the VolumeSnapshotRef does not match", snapshot.Name, content.Name)
} else if content.Spec.VolumeSnapshotRef.UID == "" {
contentClone := content.DeepCopy()
contentClone.Spec.VolumeSnapshotRef.UID = snapshot.UID
className := *(snapshot.Spec.VolumeSnapshotClassName)
contentClone.Spec.VolumeSnapshotClassName = &className
newContent, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotContents().Update(contentClone)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshotContent[%s] error status failed %v", newContent.Name, err)
return err
}
_, err = ctrl.storeContentUpdate(newContent)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshotContent[%s] error status: cannot update internal cache %v", newContent.Name, err)
return err
}
}
return nil
}
func (ctrl *csiSnapshotController) checkandUpdateSnapshotStatusOperation(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) (*crdv1.VolumeSnapshot, error) {
status, _, err := ctrl.handler.GetSnapshotStatus(content)
if err != nil {
return nil, fmt.Errorf("failed to check snapshot status %s with error %v", snapshot.Name, err)
}
newSnapshot, err := ctrl.updateSnapshotStatus(snapshot, status, time.Now(), nil, IsSnapshotBound(snapshot, content))
if err != nil {
return nil, err
}
return newSnapshot, nil
}
// The function goes through the whole snapshot creation process.
// 1. Trigger the snapshot through csi storage provider.
// 2. Update VolumeSnapshot status with creationtimestamp information
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
glog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", snapshotKey(snapshot))
if snapshot.Status.Error != nil && !isControllerUpdateFailError(snapshot.Status.Error) {
glog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", snapshot.Status.Error.Message)
return snapshot, nil
}
className := snapshot.Spec.VolumeSnapshotClassName
glog.V(5).Infof("createSnapshotOperation [%s]: VolumeSnapshotClassName [%s]", snapshot.Name, *className)
var class *crdv1.VolumeSnapshotClass
var err error
if className != nil {
class, err = ctrl.GetSnapshotClass(*className)
if err != nil {
glog.Errorf("createSnapshotOperation failed to getClassFromVolumeSnapshot %s", err)
return nil, err
}
} else {
glog.Errorf("failed to take snapshot %s without a snapshot class", snapshot.Name)
return nil, fmt.Errorf("failed to take snapshot %s without a snapshot class", snapshot.Name)
}
volume, err := ctrl.getVolumeFromVolumeSnapshot(snapshot)
if err != nil {
glog.Errorf("createSnapshotOperation failed to get PersistentVolume object [%s]: Error: [%#v]", snapshot.Name, err)
return nil, err
}
// Create VolumeSnapshotContent name
contentName := GetSnapshotContentNameForSnapshot(snapshot)
// Resolve snapshotting secret credentials.
snapshotterSecretRef, err := GetSecretReference(class.Parameters, contentName, snapshot)
if err != nil {
return nil, err
}
snapshotterCredentials, err := GetCredentials(ctrl.client, snapshotterSecretRef)
if err != nil {
return nil, err
}
driverName, snapshotID, timestamp, size, csiSnapshotStatus, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
if err != nil {
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
}
glog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, timestamp %d, size %d, csiSnapshotStatus %v", driverName, snapshotID, timestamp, size, csiSnapshotStatus)
var newSnapshot *crdv1.VolumeSnapshot
// Update snapshot status with timestamp
for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ {
glog.V(5).Infof("createSnapshot [%s]: trying to update snapshot creation timestamp", snapshotKey(snapshot))
newSnapshot, err = ctrl.updateSnapshotStatus(snapshot, csiSnapshotStatus, time.Unix(0, timestamp), resource.NewQuantity(size, resource.BinarySI), false)
if err == nil {
break
}
glog.V(4).Infof("failed to update snapshot %s creation timestamp: %v", snapshotKey(snapshot), err)
}
if err != nil {
return nil, err
}
// Create VolumeSnapshotContent in the database
volumeRef, err := ref.GetReference(scheme.Scheme, volume)
snapshotContent := &crdv1.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: contentName,
},
Spec: crdv1.VolumeSnapshotContentSpec{
VolumeSnapshotRef: &v1.ObjectReference{
Kind: "VolumeSnapshot",
Namespace: snapshot.Namespace,
Name: snapshot.Name,
UID: snapshot.UID,
APIVersion: "snapshot.storage.k8s.io/v1alpha1",
},
PersistentVolumeRef: volumeRef,
VolumeSnapshotSource: crdv1.VolumeSnapshotSource{
CSI: &crdv1.CSIVolumeSnapshotSource{
Driver: driverName,
SnapshotHandle: snapshotID,
CreationTime: &timestamp,
RestoreSize: resource.NewQuantity(size, resource.BinarySI),
},
},
VolumeSnapshotClassName: &(class.Name),
},
}
// Try to create the VolumeSnapshotContent object several times
for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ {
glog.V(5).Infof("createSnapshot [%s]: trying to save volume snapshot content %s", snapshotKey(snapshot), snapshotContent.Name)
if _, err = ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotContents().Create(snapshotContent); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.V(3).Infof("volume snapshot content %q for snapshot %q already exists, reusing", snapshotContent.Name, snapshotKey(snapshot))
err = nil
} else {
glog.V(3).Infof("volume snapshot content %q for snapshot %q saved", snapshotContent.Name, snapshotKey(snapshot))
}
break
}
// Save failed, try again after a while.
glog.V(3).Infof("failed to save volume snapshot content %q for snapshot %q: %v", snapshotContent.Name, snapshotKey(snapshot), err)
time.Sleep(ctrl.createSnapshotContentInterval)
}
if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate volumesnapshot content object for it.
// Emit some event here and controller should try to create the content in next sync period.
strerr := fmt.Sprintf("Error creating volume snapshot content object for snapshot %s: %v.", snapshotKey(snapshot), err)
glog.Error(strerr)
ctrl.eventRecorder.Event(newSnapshot, v1.EventTypeWarning, "CreateSnapshotContentFailed", strerr)
return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error())
}
// save succeeded, bind and update status for snapshot.
result, err := ctrl.bindandUpdateVolumeSnapshot(snapshotContent, newSnapshot)
if err != nil {
return nil, err
}
return result, nil
}
// Delete a snapshot
// 1. Find the SnapshotContent corresponding to Snapshot
// 1a: Not found => finish (it's been deleted already)
// 2. Ask the backend to remove the snapshot device
// 3. Delete the SnapshotContent object
// 4. Remove the Snapshot from store
// 5. Finish
func (ctrl *csiSnapshotController) deleteSnapshotContentOperation(content *crdv1.VolumeSnapshotContent) error {
glog.V(5).Infof("deleteSnapshotOperation [%s] started", content.Name)
// get secrets if VolumeSnapshotClass specifies it
var snapshotterCredentials map[string]string
snapshotClassName := content.Spec.VolumeSnapshotClassName
if snapshotClassName != nil {
if snapshotClass, err := ctrl.classLister.Get(*snapshotClassName); err == nil {
// Resolve snapshotting secret credentials.
// No VolumeSnapshot is provided when resolving delete secret names, since the VolumeSnapshot may or may not exist at delete time.
snapshotterSecretRef, err := GetSecretReference(snapshotClass.Parameters, content.Name, nil)
if err != nil {
return err
}
snapshotterCredentials, err = GetCredentials(ctrl.client, snapshotterSecretRef)
if err != nil {
return err
}
}
}
err := ctrl.handler.DeleteSnapshot(content, snapshotterCredentials)
if err != nil {
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotDeleteError", "Failed to delete snapshot")
return fmt.Errorf("failed to delete snapshot %#v, err: %v", content.Name, err)
}
err = ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotContents().Delete(content.Name, &metav1.DeleteOptions{})
if err != nil {
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotContentObjectDeleteError", "Failed to delete snapshot content API object")
return fmt.Errorf("failed to delete VolumeSnapshotContent %s from API server: %q", content.Name, err)
}
return nil
}
func (ctrl *csiSnapshotController) bindandUpdateVolumeSnapshot(snapshotContent *crdv1.VolumeSnapshotContent, snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
glog.V(5).Infof("bindandUpdateVolumeSnapshot for snapshot [%s]: snapshotContent [%s]", snapshot.Name, snapshotContent.Name)
snapshotObj, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Get(snapshot.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get snapshot %s from api server: %v", snapshotKey(snapshot), err)
}
// Copy the snapshot object before updating it
snapshotCopy := snapshotObj.DeepCopy()
if snapshotObj.Spec.SnapshotContentName == snapshotContent.Name {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: VolumeSnapshot %s already bind to volumeSnapshotContent [%s]", snapshot.Name, snapshotContent.Name)
} else {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: before bind VolumeSnapshot %s to volumeSnapshotContent [%s]", snapshot.Name, snapshotContent.Name)
snapshotCopy.Spec.SnapshotContentName = snapshotContent.Name
updateSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotCopy)
if err != nil {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: Error binding VolumeSnapshot %s to volumeSnapshotContent [%s]. Error [%#v]", snapshot.Name, snapshotContent.Name, err)
return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error())
}
snapshotCopy = updateSnapshot
_, err = ctrl.storeSnapshotUpdate(snapshotCopy)
if err != nil {
glog.Errorf("%v", err)
}
}
glog.V(5).Infof("bindandUpdateVolumeSnapshot for snapshot completed [%#v]", snapshotCopy)
return snapshotCopy, nil
}
// UpdateSnapshotStatus converts snapshot status to crdv1.VolumeSnapshotCondition
func (ctrl *csiSnapshotController) updateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, csistatus *csi.SnapshotStatus, timestamp time.Time, size *resource.Quantity, bound bool) (*crdv1.VolumeSnapshot, error) {
glog.V(5).Infof("updating VolumeSnapshot[]%s, set status %v, timestamp %v", snapshotKey(snapshot), csistatus, timestamp)
status := snapshot.Status
change := false
timeAt := &metav1.Time{
Time: timestamp,
}
snapshotClone := snapshot.DeepCopy()
switch csistatus.Type {
case csi.SnapshotStatus_READY:
if bound {
status.Ready = true
// Remove the error if checking snapshot is already bound and ready
status.Error = nil
change = true
}
if status.CreationTime == nil {
status.CreationTime = timeAt
change = true
}
case csi.SnapshotStatus_ERROR_UPLOADING:
if status.Error == nil {
status.Error = &storage.VolumeError{
Time: *timeAt,
Message: "Failed to upload the snapshot",
}
change = true
ctrl.eventRecorder.Event(snapshot, v1.EventTypeWarning, "SnapshotUploadError", "Failed to upload the snapshot")
}
case csi.SnapshotStatus_UPLOADING:
if status.CreationTime == nil {
status.CreationTime = timeAt
change = true
}
}
if change {
if size != nil {
status.RestoreSize = size
}
snapshotClone.Status = status
newSnapshotObj, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)
if err != nil {
return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error())
} else {
return newSnapshotObj, nil
}
}
return snapshot, nil
}
// getVolumeFromVolumeSnapshot is a helper function to get PV from VolumeSnapshot.
func (ctrl *csiSnapshotController) getVolumeFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*v1.PersistentVolume, error) {
pvc, err := ctrl.getClaimFromVolumeSnapshot(snapshot)
if err != nil {
return nil, err
}
if pvc.Status.Phase != v1.ClaimBound {
return nil, fmt.Errorf("the PVC %s is not yet bound to a PV, will not attempt to take a snapshot", pvc.Name)
}
pvName := pvc.Spec.VolumeName
pv, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to retrieve PV %s from the API server: %q", pvName, err)
}
glog.V(5).Infof("getVolumeFromVolumeSnapshot: snapshot [%s] PV name [%s]", snapshot.Name, pvName)
return pv, nil
}
func (ctrl *csiSnapshotController) getStorageClassFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*storagev1.StorageClass, error) {
// Get storage class from PVC or PV
pvc, err := ctrl.getClaimFromVolumeSnapshot(snapshot)
if err != nil {
return nil, err
}
storageclassName := *pvc.Spec.StorageClassName
if len(storageclassName) == 0 {
volume, err := ctrl.getVolumeFromVolumeSnapshot(snapshot)
if err != nil {
return nil, err
}
storageclassName = volume.Spec.StorageClassName
}
if len(storageclassName) == 0 {
return nil, fmt.Errorf("cannot figure out the snapshot class automatically, please specify one in snapshot spec.")
}
storageclass, err := ctrl.client.StorageV1().StorageClasses().Get(*pvc.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return storageclass, nil
}
// GetSnapshotClass is a helper function to get snapshot class from the class name.
func (ctrl *csiSnapshotController) GetSnapshotClass(className string) (*crdv1.VolumeSnapshotClass, error) {
glog.V(5).Infof("getSnapshotClass: VolumeSnapshotClassName [%s]", className)
obj, found, err := ctrl.classStore.GetByKey(className)
if found {
class, ok := obj.(*crdv1.VolumeSnapshotClass)
if ok {
return class, nil
}
}
class, err := ctrl.classLister.Get(className)
if err != nil {
glog.Errorf("failed to retrieve snapshot class %s from the API server: %q", className, err)
return nil, fmt.Errorf("failed to retrieve snapshot class %s from the API server: %q", className, err)
}
_, updateErr := ctrl.storeClassUpdate(class)
if updateErr != nil {
glog.V(4).Infof("getSnapshotClass [%s]: cannot update internal cache: %v", class.Name, updateErr)
}
return class, nil
}
// SetDefaultSnapshotClass is a helper function to figure out the default snapshot class from
// PVC/PV StorageClass and update VolumeSnapshot with this snapshot class name.
func (ctrl *csiSnapshotController) SetDefaultSnapshotClass(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshotClass, *crdv1.VolumeSnapshot, error) {
glog.V(5).Infof("SetDefaultSnapshotClass for snapshot [%s]", snapshot.Name)
storageclass, err := ctrl.getStorageClassFromVolumeSnapshot(snapshot)
if err != nil {
return nil, nil, err
}
// Find default snapshot class if available
list, err := ctrl.classLister.List(labels.Everything())
if err != nil {
return nil, nil, err
}
defaultClasses := []*crdv1.VolumeSnapshotClass{}
for _, class := range list {
if IsDefaultAnnotation(class.ObjectMeta) && storageclass.Provisioner == class.Snapshotter && ctrl.snapshotterName == class.Snapshotter {
defaultClasses = append(defaultClasses, class)
glog.V(5).Infof("get defaultClass added: %s", class.Name)
}
}
if len(defaultClasses) == 0 {
return nil, nil, fmt.Errorf("cannot find default snapshot class")
}
if len(defaultClasses) > 1 {
glog.V(4).Infof("get DefaultClass %d defaults found", len(defaultClasses))
return nil, nil, fmt.Errorf("%d default snapshot classes were found", len(defaultClasses))
}
glog.V(5).Infof("setDefaultSnapshotClass [%s]: default VolumeSnapshotClassName [%s]", snapshot.Name, defaultClasses[0].Name)
snapshotClone := snapshot.DeepCopy()
snapshotClone.Spec.VolumeSnapshotClassName = &(defaultClasses[0].Name)
newSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] default class failed %v", snapshotKey(snapshot), err)
}
_, updateErr := ctrl.storeSnapshotUpdate(newSnapshot)
if updateErr != nil {
// We will get an "snapshot update" event soon, this is not a big error
glog.V(4).Infof("setDefaultSnapshotClass [%s]: cannot update internal cache: %v", snapshotKey(snapshot), updateErr)
}
_, updateErr = ctrl.storeClassUpdate(defaultClasses[0])
if updateErr != nil {
glog.V(4).Infof("setDefaultSnapshotClass [%s]: cannot update internal cache: %v", defaultClasses[0].Name, updateErr)
}
return defaultClasses[0], newSnapshot, nil
}
// getClaimFromVolumeSnapshot is a helper function to get PVC from VolumeSnapshot.
func (ctrl *csiSnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*v1.PersistentVolumeClaim, error) {
if snapshot.Spec.Source == nil || snapshot.Spec.Source.Kind != pvcKind {
return nil, fmt.Errorf("The snapshot source is not the right type. Expected %s, Got %v", pvcKind, snapshot.Spec.Source)
}
pvcName := snapshot.Spec.Source.Name
if pvcName == "" {
return nil, fmt.Errorf("the PVC name is not specified in snapshot %s", snapshotKey(snapshot))
}
pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err)
}
return pvc, nil
}
var _ error = controllerUpdateError{}
type controllerUpdateError struct {
message string
}
func newControllerUpdateError(name, message string) error {
return controllerUpdateError{
message: fmt.Sprintf("%s %s on API server: %s", controllerUpdateFailMsg, name, message),
}
}
func (e controllerUpdateError) Error() string {
return e.message
}
func isControllerUpdateFailError(err *storage.VolumeError) bool {
if err != nil {
if strings.Contains(err.Message, controllerUpdateFailMsg) {
return true
}
}
return false
}

View File

@@ -0,0 +1,472 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"time"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
storageinformers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions/volumesnapshot/v1alpha1"
storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/util/goroutinemap"
)
type csiSnapshotController struct {
clientset clientset.Interface
client kubernetes.Interface
snapshotterName string
eventRecorder record.EventRecorder
snapshotQueue workqueue.RateLimitingInterface
contentQueue workqueue.RateLimitingInterface
snapshotLister storagelisters.VolumeSnapshotLister
snapshotListerSynced cache.InformerSynced
contentLister storagelisters.VolumeSnapshotContentLister
contentListerSynced cache.InformerSynced
classLister storagelisters.VolumeSnapshotClassLister
classListerSynced cache.InformerSynced
snapshotStore cache.Store
contentStore cache.Store
classStore cache.Store
handler Handler
// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap
createSnapshotContentRetryCount int
createSnapshotContentInterval time.Duration
resyncPeriod time.Duration
}
// NewCSISnapshotController returns a new *csiSnapshotController
func NewCSISnapshotController(
clientset clientset.Interface,
client kubernetes.Interface,
snapshotterName string,
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
createSnapshotContentRetryCount int,
createSnapshotContentInterval time.Duration,
conn connection.CSIConnection,
timeout time.Duration,
resyncPeriod time.Duration,
snapshotNamePrefix string,
snapshotNameUUIDLength int,
) *csiSnapshotController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.Core().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", snapshotterName)})
ctrl := &csiSnapshotController{
clientset: clientset,
client: client,
snapshotterName: snapshotterName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(conn, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
runningOperations: goroutinemap.NewGoRoutineMap(true),
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
createSnapshotContentInterval: createSnapshotContentInterval,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
classStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueSnapshotWork(newObj) },
DeleteFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
},
ctrl.resyncPeriod,
)
ctrl.snapshotLister = volumeSnapshotInformer.Lister()
ctrl.snapshotListerSynced = volumeSnapshotInformer.Informer().HasSynced
volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueContentWork(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueContentWork(newObj) },
DeleteFunc: func(obj interface{}) { ctrl.enqueueContentWork(obj) },
},
ctrl.resyncPeriod,
)
ctrl.contentLister = volumeSnapshotContentInformer.Lister()
ctrl.contentListerSynced = volumeSnapshotContentInformer.Informer().HasSynced
ctrl.classLister = volumeSnapshotClassInformer.Lister()
ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced
return ctrl
}
func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) {
defer ctrl.snapshotQueue.ShutDown()
defer ctrl.contentQueue.ShutDown()
glog.Infof("Starting CSI snapshotter")
defer glog.Infof("Shutting CSI snapshotter")
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced) {
glog.Errorf("Cannot sync caches")
return
}
ctrl.initializeCaches(ctrl.snapshotLister, ctrl.contentLister)
for i := 0; i < workers; i++ {
go wait.Until(ctrl.snapshotWorker, 0, stopCh)
go wait.Until(ctrl.contentWorker, 0, stopCh)
}
<-stopCh
}
// enqueueSnapshotWork adds snapshot to given work queue.
func (ctrl *csiSnapshotController) enqueueSnapshotWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if snapshot, ok := obj.(*crdv1.VolumeSnapshot); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(snapshot)
if err != nil {
glog.Errorf("failed to get key from object: %v, %v", err, snapshot)
return
}
glog.V(5).Infof("enqueued %q for sync", objName)
ctrl.snapshotQueue.Add(objName)
}
}
// enqueueContentWork adds snapshot content to given work queue.
func (ctrl *csiSnapshotController) enqueueContentWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if content, ok := obj.(*crdv1.VolumeSnapshotContent); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(content)
if err != nil {
glog.Errorf("failed to get key from object: %v, %v", err, content)
return
}
glog.V(5).Infof("enqueued %q for sync", objName)
ctrl.contentQueue.Add(objName)
}
}
// snapshotWorker processes items from snapshotQueue. It must run only once,
// syncSnapshot is not assured to be reentrant.
func (ctrl *csiSnapshotController) snapshotWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.snapshotQueue.Get()
if quit {
return true
}
defer ctrl.snapshotQueue.Done(keyObj)
key := keyObj.(string)
glog.V(5).Infof("snapshotWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
glog.V(5).Infof("snapshotWorker: snapshot namespace [%s] name [%s]", namespace, name)
if err != nil {
glog.Errorf("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err)
return false
}
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err == nil {
// The volume snapshot still exists in informer cache, the event must have
// been add/update/sync
newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot)
if err == nil {
glog.V(5).Infof("passed checkAndUpdateSnapshotClass for snapshot %q", key)
ctrl.updateSnapshot(newSnapshot)
}
return false
}
if err != nil && !errors.IsNotFound(err) {
glog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
return false
}
// The snapshot is not in informer cache, the event must have been "delete"
vsObj, found, err := ctrl.snapshotStore.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting snapshot %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the snapshot from its cache
glog.V(2).Infof("deletion of snapshot %q was already processed", key)
return false
}
snapshot, ok := vsObj.(*crdv1.VolumeSnapshot)
if !ok {
glog.Errorf("expected vs, got %+v", vsObj)
return false
}
newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot)
if err == nil {
ctrl.deleteSnapshot(newSnapshot)
}
return false
}
for {
if quit := workFunc(); quit {
glog.Infof("snapshot worker queue shutting down")
return
}
}
}
// contentWorker processes items from contentQueue. It must run only once,
// syncContent is not assured to be reentrant.
func (ctrl *csiSnapshotController) contentWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.contentQueue.Get()
if quit {
return true
}
defer ctrl.contentQueue.Done(keyObj)
key := keyObj.(string)
glog.V(5).Infof("contentWorker[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
return false
}
content, err := ctrl.contentLister.Get(name)
if err == nil {
// Skip update if content is for another CSI driver
snapshotClassName := content.Spec.VolumeSnapshotClassName
if snapshotClassName != nil {
if snapshotClass, err := ctrl.classLister.Get(*snapshotClassName); err == nil {
if snapshotClass.Snapshotter != ctrl.snapshotterName {
return false
}
}
}
// The content still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateContent(content)
return false
}
if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting content %q from informer: %v", key, err)
return false
}
// The content is not in informer cache, the event must have been
// "delete"
contentObj, found, err := ctrl.contentStore.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting content %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the volume from its cache
glog.V(2).Infof("deletion of content %q was already processed", key)
return false
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
glog.Errorf("expected content, got %+v", content)
return false
}
ctrl.deleteContent(content)
return false
}
for {
if quit := workFunc(); quit {
glog.Infof("content worker queue shutting down")
return
}
}
}
// checkAndUpdateSnapshotClass gets the VolumeSnapshotClass from VolumeSnapshot. If it is not set,
// gets it from default VolumeSnapshotClass and sets it. It also detects if snapshotter in the
// VolumeSnapshotClass is the same as the snapshotter in external controller.
func (ctrl *csiSnapshotController) checkAndUpdateSnapshotClass(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
className := snapshot.Spec.VolumeSnapshotClassName
var class *crdv1.VolumeSnapshotClass
var err error
newSnapshot := snapshot
if className != nil {
glog.V(5).Infof("checkAndUpdateSnapshotClass [%s]: VolumeSnapshotClassName [%s]", snapshot.Name, *className)
class, err = ctrl.GetSnapshotClass(*className)
if err != nil {
glog.Errorf("checkAndUpdateSnapshotClass failed to getSnapshotClass %v", err)
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "GetSnapshotClassFailed", fmt.Sprintf("Failed to get snapshot class with error %v", err))
return nil, err
}
} else {
glog.V(5).Infof("checkAndUpdateSnapshotClass [%s]: SetDefaultSnapshotClass", snapshot.Name)
class, newSnapshot, err = ctrl.SetDefaultSnapshotClass(snapshot)
if err != nil {
glog.Errorf("checkAndUpdateSnapshotClass failed to setDefaultClass %v", err)
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SetDefaultSnapshotClassFailed", fmt.Sprintf("Failed to set default snapshot class with error %v", err))
return nil, err
}
}
glog.V(5).Infof("VolumeSnapshotClass Snapshotter [%s] Snapshot Controller snapshotterName [%s]", class.Snapshotter, ctrl.snapshotterName)
if class.Snapshotter != ctrl.snapshotterName {
glog.V(4).Infof("Skipping VolumeSnapshot %s for snapshotter [%s] in VolumeSnapshotClass because it does not match with the snapshotter for controller [%s]", snapshotKey(snapshot), class.Snapshotter, ctrl.snapshotterName)
return nil, fmt.Errorf("volumeSnapshotClass does not match with the snapshotter for controller")
}
return newSnapshot, nil
}
// updateSnapshot runs in worker thread and handles "snapshot added",
// "snapshot updated" and "periodic sync" events.
func (ctrl *csiSnapshotController) updateSnapshot(snapshot *crdv1.VolumeSnapshot) {
// Store the new snapshot version in the cache and do not process it if this is
// an old version.
glog.V(5).Infof("updateSnapshot %q", snapshotKey(snapshot))
newSnapshot, err := ctrl.storeSnapshotUpdate(snapshot)
if err != nil {
glog.Errorf("%v", err)
}
if !newSnapshot {
return
}
err = ctrl.syncSnapshot(snapshot)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
glog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(snapshot), err)
} else {
glog.Errorf("could not sync volume %q: %+v", snapshotKey(snapshot), err)
}
}
}
// updateContent runs in worker thread and handles "content added",
// "content updated" and "periodic sync" events.
func (ctrl *csiSnapshotController) updateContent(content *crdv1.VolumeSnapshotContent) {
// Store the new content version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeContentUpdate(content)
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}
err = ctrl.syncContent(content)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
glog.V(3).Infof("could not sync content %q: %+v", content.Name, err)
} else {
glog.Errorf("could not sync content %q: %+v", content.Name, err)
}
}
}
// deleteSnapshot runs in worker thread and handles "snapshot deleted" event.
func (ctrl *csiSnapshotController) deleteSnapshot(snapshot *crdv1.VolumeSnapshot) {
_ = ctrl.snapshotStore.Delete(snapshot)
glog.V(4).Infof("snapshot %q deleted", snapshotKey(snapshot))
snapshotContentName := snapshot.Spec.SnapshotContentName
if snapshotContentName == "" {
glog.V(5).Infof("deleteSnapshot[%q]: content not bound", snapshotKey(snapshot))
return
}
// sync the content when its snapshot is deleted. Explicitly sync'ing the
// content here in response to snapshot deletion prevents the content from
// waiting until the next sync period for its Release.
glog.V(5).Infof("deleteSnapshot[%q]: scheduling sync of content %s", snapshotKey(snapshot), snapshotContentName)
ctrl.contentQueue.Add(snapshotContentName)
}
// deleteContent runs in worker thread and handles "snapshot deleted" event.
func (ctrl *csiSnapshotController) deleteContent(content *crdv1.VolumeSnapshotContent) {
_ = ctrl.contentStore.Delete(content)
glog.V(4).Infof("content %q deleted", content.Name)
snapshotName := snapshotRefKey(content.Spec.VolumeSnapshotRef)
if snapshotName == "" {
glog.V(5).Infof("deleteContent[%q]: content not bound", content.Name)
return
}
// sync the snapshot when its content is deleted. Explicitly sync'ing the
// snapshot here in response to content deletion prevents the snapshot from
// waiting until the next sync period for its Release.
glog.V(5).Infof("deleteContent[%q]: scheduling sync of snapshot %s", content.Name, snapshotName)
ctrl.snapshotQueue.Add(snapshotName)
}
// initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addSnapshot/addContent to
// perform initial synchronization of the controller.
func (ctrl *csiSnapshotController) initializeCaches(snapshotLister storagelisters.VolumeSnapshotLister, contentLister storagelisters.VolumeSnapshotContentLister) {
snapshotList, err := snapshotLister.List(labels.Everything())
if err != nil {
glog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, snapshot := range snapshotList {
snapshotClone := snapshot.DeepCopy()
if _, err = ctrl.storeSnapshotUpdate(snapshotClone); err != nil {
glog.Errorf("error updating volume snapshot cache: %v", err)
}
}
contentList, err := contentLister.List(labels.Everything())
if err != nil {
glog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, content := range contentList {
contentClone := content.DeepCopy()
if _, err = ctrl.storeSnapshotUpdate(contentClone); err != nil {
glog.Errorf("error updating volume snapshot cache: %v", err)
}
}
glog.V(4).Infof("controller initialized")
}

236
pkg/controller/util.go Normal file
View File

@@ -0,0 +1,236 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"os"
"strconv"
)
var (
keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)
const snapshotterSecretNameKey = "csiSnapshotterSecretName"
const snapshotterSecretNamespaceKey = "csiSnapshotterSecretNamespace"
func snapshotKey(vs *crdv1.VolumeSnapshot) string {
return fmt.Sprintf("%s/%s", vs.Namespace, vs.Name)
}
func snapshotRefKey(vsref *v1.ObjectReference) string {
return fmt.Sprintf("%s/%s", vsref.Namespace, vsref.Name)
}
// storeObjectUpdate updates given cache with a new object version from Informer
// callback (i.e. with events from etcd) or with an object modified by the
// controller itself. Returns "true", if the cache was updated, false if the
// object is an old version and should be ignored.
func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
objName, err := keyFunc(obj)
if err != nil {
return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
oldObj, found, err := store.Get(obj)
if err != nil {
return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
}
objAccessor, err := meta.Accessor(obj)
if err != nil {
return false, err
}
if !found {
// This is a new object
glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
if err = store.Add(obj); err != nil {
return false, fmt.Errorf("error adding %s %q to controller cache: %v", className, objName, err)
}
return true, nil
}
oldObjAccessor, err := meta.Accessor(oldObj)
if err != nil {
return false, err
}
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return false, fmt.Errorf("error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err)
}
oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return false, fmt.Errorf("error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err)
}
// Throw away only older version, let the same version pass - we do want to
// get periodic sync events.
if oldObjResourceVersion > objResourceVersion {
glog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion())
return false, nil
}
glog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion())
if err = store.Update(obj); err != nil {
return false, fmt.Errorf("error updating %s %q in controller cache: %v", className, objName, err)
}
return true, nil
}
// GetSnapshotContentNameForSnapshot returns SnapshotContent.Name for the create VolumeSnapshotContent.
// The name must be unique.
func GetSnapshotContentNameForSnapshot(snapshot *crdv1.VolumeSnapshot) string {
return "snapcontent-" + string(snapshot.UID)
}
// IsDefaultAnnotation returns a boolean if
// the annotation is set
func IsDefaultAnnotation(obj metav1.ObjectMeta) bool {
if obj.Annotations[IsDefaultSnapshotClassAnnotation] == "true" {
return true
}
return false
}
// GetSecretReference returns a reference to the secret specified in the given nameKey and namespaceKey parameters, or an error if the parameters are not specified correctly.
// if neither the name or namespace parameter are set, a nil reference and no error is returned.
// no lookup of the referenced secret is performed, and the secret may or may not exist.
//
// supported tokens for name resolution:
// - ${volumesnapshotcontent.name}
// - ${volumesnapshot.namespace}
// - ${volumesnapshot.name}
// - ${volumesnapshot.annotations['ANNOTATION_KEY']} (e.g. ${pvc.annotations['example.com/snapshot-create-secret-name']})
//
// supported tokens for namespace resolution:
// - ${volumesnapshotcontent.name}
// - ${volumesnapshot.namespace}
//
// an error is returned in the following situations:
// - only one of name or namespace is provided
// - the name or namespace parameter contains a token that cannot be resolved
// - the resolved name is not a valid secret name
// - the resolved namespace is not a valid namespace name
func GetSecretReference(snapshotClassParams map[string]string, snapContentName string, snapshot *crdv1.VolumeSnapshot) (*v1.SecretReference, error) {
nameTemplate, hasName := snapshotClassParams[snapshotterSecretNameKey]
namespaceTemplate, hasNamespace := snapshotClassParams[snapshotterSecretNamespaceKey]
if !hasName && !hasNamespace {
return nil, nil
}
if len(nameTemplate) == 0 || len(namespaceTemplate) == 0 {
return nil, fmt.Errorf("%s and %s parameters must be specified together", snapshotterSecretNameKey, snapshotterSecretNamespaceKey)
}
ref := &v1.SecretReference{}
// Secret namespace template can make use of the VolumeSnapshotContent name or the VolumeSnapshot namespace.
// Note that neither of those things are under the control of the VolumeSnapshot user.
namespaceParams := map[string]string{"volumesnapshotcontent.name": snapContentName}
// snapshot may be nil when resolving create/delete snapshot secret names because the
// snapshot may or may not exist at delete time
if snapshot != nil {
namespaceParams["volumesnapshot.namespace"] = snapshot.Namespace
}
resolvedNamespace, err := resolveTemplate(namespaceTemplate, namespaceParams)
if err != nil {
return nil, fmt.Errorf("error resolving %s value %q: %v", snapshotterSecretNamespaceKey, namespaceTemplate, err)
}
glog.V(4).Infof("GetSecretReference namespaceTemplate %s, namespaceParams: %+v, resolved %s", namespaceTemplate, namespaceParams, resolvedNamespace)
if len(validation.IsDNS1123Label(resolvedNamespace)) > 0 {
if namespaceTemplate != resolvedNamespace {
return nil, fmt.Errorf("%s parameter %q resolved to %q which is not a valid namespace name", snapshotterSecretNamespaceKey, namespaceTemplate, resolvedNamespace)
}
return nil, fmt.Errorf("%s parameter %q is not a valid namespace name", snapshotterSecretNamespaceKey, namespaceTemplate)
}
ref.Namespace = resolvedNamespace
// Secret name template can make use of the VolumeSnapshotContent name, VolumeSnapshot name or namespace,
// or a VolumeSnapshot annotation.
// Note that VolumeSnapshot name and annotations are under the VolumeSnapshot user's control.
nameParams := map[string]string{"volumesnapshotcontent.name": snapContentName}
if snapshot != nil {
nameParams["volumesnapshot.name"] = snapshot.Name
nameParams["volumesnapshot.namespace"] = snapshot.Namespace
for k, v := range snapshot.Annotations {
nameParams["volumesnapshot.annotations['"+k+"']"] = v
}
}
resolvedName, err := resolveTemplate(nameTemplate, nameParams)
if err != nil {
return nil, fmt.Errorf("error resolving %s value %q: %v", snapshotterSecretNameKey, nameTemplate, err)
}
if len(validation.IsDNS1123Subdomain(resolvedName)) > 0 {
if nameTemplate != resolvedName {
return nil, fmt.Errorf("%s parameter %q resolved to %q which is not a valid secret name", snapshotterSecretNameKey, nameTemplate, resolvedName)
}
return nil, fmt.Errorf("%s parameter %q is not a valid secret name", snapshotterSecretNameKey, nameTemplate)
}
ref.Name = resolvedName
glog.V(4).Infof("GetSecretReference validated Secret: %+v", ref)
return ref, nil
}
// resolveTemplate resolves the template by checking if the value is missing for a key
func resolveTemplate(template string, params map[string]string) (string, error) {
missingParams := sets.NewString()
resolved := os.Expand(template, func(k string) string {
v, ok := params[k]
if !ok {
missingParams.Insert(k)
}
return v
})
if missingParams.Len() > 0 {
return "", fmt.Errorf("invalid tokens: %q", missingParams.List())
}
return resolved, nil
}
// GetCredentials retrieves credentials stored in v1.SecretReference
func GetCredentials(k8s kubernetes.Interface, ref *v1.SecretReference) (map[string]string, error) {
if ref == nil {
return nil, nil
}
secret, err := k8s.CoreV1().Secrets(ref.Namespace).Get(ref.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err)
}
credentials := map[string]string{}
for key, value := range secret.Data {
credentials[key] = string(value)
}
return credentials, nil
}