Add Snapshot Controller

This PR adds external snapshot controller and main package under cmd.
This commit is contained in:
xing-yang
2018-07-12 11:00:55 -07:00
committed by Xing Yang
parent cc19fd383f
commit 2663b1351f
7 changed files with 2003 additions and 2 deletions

View File

@@ -0,0 +1,139 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"reflect"
"time"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
)
const (
// SnapshotPVCAnnotation is "snapshot.alpha.kubernetes.io/snapshot"
SnapshotPVCAnnotation = "volumesnapshot.csi.k8s.io/snapshot"
)
// NewClient creates a new RESTClient
func NewClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) {
scheme := runtime.NewScheme()
if err := crdv1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
config := *cfg
config.GroupVersion = &crdv1.SchemeGroupVersion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, nil, err
}
return client, scheme, nil
}
// CreateCRD creates CustomResourceDefinition
func CreateCRD(clientset apiextensionsclient.Interface) error {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotClassResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotClassResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshotClass{}).Name(),
},
},
}
res, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
res, err)
}
crd = &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotContentResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotContentResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshotContent{}).Name(),
},
},
}
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotContentResource: %#v, err: %#v",
res, err)
}
crd = &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.NamespaceScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshot{}).Name(),
},
},
}
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
res, err)
}
return nil
}
// WaitForSnapshotResource waits for the snapshot resource
func WaitForSnapshotResource(snapshotClient *rest.RESTClient) error {
return wait.Poll(100*time.Millisecond, 60*time.Second, func() (bool, error) {
_, err := snapshotClient.Get().
Resource(crdv1.VolumeSnapshotContentResourcePlural).DoRaw()
if err == nil {
return true, nil
}
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
})
}

View File

@@ -1,7 +1,186 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import "fmt"
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"time"
"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
"github.com/kubernetes-csi/external-snapshotter/pkg/controller"
clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
)
const (
// Number of worker threads
threads = 10
// Default timeout of short CSI calls like GetPluginInfo
csiTimeout = time.Second
)
// Command line flags
var (
snapshotter = flag.String("snapshotter", "", "Name of the snapshotter. The snapshotter will only create snapshot data for snapshot that request a StorageClass with a snapshotter field set equal to this name.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
resync = flag.Duration("resync", 10*time.Second, "Resync interval of the controller.")
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
createSnapshotContentRetryCount = flag.Int("createSnapshotContentRetryCount", 5, "Number of retries when we create a snapshot data object for a snapshot.")
createSnapshotContentInterval = flag.Duration("createSnapshotContentInterval", 10*time.Second, "Interval between retries when we create a snapshot data object for a snapshot.")
resyncPeriod = flag.Duration("resyncPeriod", 60*time.Second, "The period that should be used to re-sync the snapshot.")
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot")
)
func main() {
fmt.Println("vim-go")
flag.Set("logtostderr", "true")
flag.Parse()
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
snapClient, err := clientset.NewForConfig(config)
if err != nil {
glog.Errorf("Error building snapshot clientset: %s", err.Error())
os.Exit(1)
}
factory := informers.NewSharedInformerFactory(snapClient, *resync)
// Create CRD resource
aeclientset, err := apiextensionsclient.NewForConfig(config)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// initialize CRD resource if it does not exist
err = CreateCRD(aeclientset)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// Connect to CSI.
csiConn, err := connection.New(*csiAddress, *connectionTimeout)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// Find driver name.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Check it's ready
if err = waitForDriverReady(csiConn, *connectionTimeout); err != nil {
glog.Error(err.Error())
os.Exit(1)
}
// Find out if the driver supports create/delete snapshot.
supportsCreateSnapshot, err := csiConn.SupportsControllerCreateSnapshot(ctx)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
if !supportsCreateSnapshot {
glog.Error("CSI driver does not support ControllerCreateSnapshot")
os.Exit(1)
}
glog.V(2).Infof("Start NewCSISnapshotController with snapshotter %s", *snapshotter)
ctrl := controller.NewCSISnapshotController(
snapClient,
kubeClient,
*snapshotter,
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
*createSnapshotContentRetryCount,
*createSnapshotContentInterval,
csiConn,
*connectionTimeout,
*resyncPeriod,
*snapshotNamePrefix,
*snapshotNameUUIDLength,
)
// run...
stopCh := make(chan struct{})
factory.Start(stopCh)
go ctrl.Run(threads, stopCh)
// ...until SIGINT
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
close(stopCh)
}
func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}
func waitForDriverReady(csiConn connection.CSIConnection, timeout time.Duration) error {
now := time.Now()
finish := now.Add(timeout)
var err error
for {
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
err = csiConn.Probe(ctx)
if err == nil {
glog.V(2).Infof("Probe succeeded")
return nil
}
glog.V(2).Infof("Probe failed with %s", err)
now := time.Now()
if now.After(finish) {
return fmt.Errorf("failed to probe the controller: %s", err)
}
time.Sleep(time.Second)
}
}

View File

@@ -0,0 +1,266 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package connection
import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"k8s.io/api/core/v1"
)
// CSIConnection is gRPC connection to a remote CSI driver and abstracts all
// CSI calls.
type CSIConnection interface {
// GetDriverName returns driver name as discovered by GetPluginInfo()
// gRPC call.
GetDriverName(ctx context.Context) (string, error)
// SupportsControllerCreateSnapshot returns true if the CSI driver reports
// CREATE_DELETE_SNAPSHOT in ControllerGetCapabilities() gRPC call.
SupportsControllerCreateSnapshot(ctx context.Context) (bool, error)
// SupportsControllerListSnapshots returns true if the CSI driver reports
// LIST_SNAPSHOTS in ControllerGetCapabilities() gRPC call.
SupportsControllerListSnapshots(ctx context.Context) (bool, error)
// CreateSnapshot creates a snapshot for a volume
CreateSnapshot(ctx context.Context, snapshotName string, snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string) (driverName string, snapshotId string, timestamp int64, status *csi.SnapshotStatus, err error)
// DeleteSnapshot deletes a snapshot from a volume
DeleteSnapshot(ctx context.Context, snapshotID string) (err error)
// GetSnapshotStatus lists snapshot from a volume
GetSnapshotStatus(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, int64, error)
// Probe checks that the CSI driver is ready to process requests
Probe(ctx context.Context) error
// Close the connection
Close() error
}
type csiConnection struct {
conn *grpc.ClientConn
}
var (
_ CSIConnection = &csiConnection{}
)
func New(address string, timeout time.Duration) (CSIConnection, error) {
conn, err := connect(address, timeout)
if err != nil {
return nil, err
}
return &csiConnection{
conn: conn,
}, nil
}
func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
glog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
glog.V(4).Infof("Connection timed out")
// subsequent GetPluginInfo will show the real connection error
return conn, nil
}
if conn.GetState() == connectivity.Ready {
glog.V(3).Infof("Connected")
return conn, nil
}
glog.V(4).Infof("Still trying, connection is %s", conn.GetState())
}
}
func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
client := csi.NewIdentityClient(c.conn)
req := csi.GetPluginInfoRequest{}
rsp, err := client.GetPluginInfo(ctx, &req)
if err != nil {
return "", err
}
name := rsp.GetName()
if name == "" {
return "", fmt.Errorf("name is empty")
}
return name, nil
}
func (c *csiConnection) Probe(ctx context.Context) error {
client := csi.NewIdentityClient(c.conn)
req := csi.ProbeRequest{}
_, err := client.Probe(ctx, &req)
if err != nil {
return err
}
return nil
}
func (c *csiConnection) SupportsControllerCreateSnapshot(ctx context.Context) (bool, error) {
client := csi.NewControllerClient(c.conn)
req := csi.ControllerGetCapabilitiesRequest{}
rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return false, err
}
caps := rsp.GetCapabilities()
for _, cap := range caps {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
if rpc.GetType() == csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT {
return true, nil
}
}
return false, nil
}
func (c *csiConnection) SupportsControllerListSnapshots(ctx context.Context) (bool, error) {
client := csi.NewControllerClient(c.conn)
req := csi.ControllerGetCapabilitiesRequest{}
rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return false, err
}
caps := rsp.GetCapabilities()
for _, cap := range caps {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
if rpc.GetType() == csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS {
return true, nil
}
}
return false, nil
}
func (c *csiConnection) CreateSnapshot(ctx context.Context, snapshotName string, snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string) (string, string, int64, *csi.SnapshotStatus, error) {
glog.V(5).Infof("CSI CreateSnapshot: %s", snapshot.Name)
if volume.Spec.CSI == nil {
return "", "", 0, nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
}
client := csi.NewControllerClient(c.conn)
driverName, err := c.GetDriverName(ctx)
if err != nil {
return "", "", 0, nil, err
}
req := csi.CreateSnapshotRequest{
SourceVolumeId: volume.Spec.CSI.VolumeHandle,
Name: snapshotName,
Parameters: parameters,
CreateSnapshotSecrets: nil,
}
rsp, err := client.CreateSnapshot(ctx, &req)
if err != nil {
return "", "", 0, nil, err
}
glog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%s] status [%s]", snapshot.Name, driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, *rsp.Snapshot.Status)
return driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, rsp.Snapshot.Status, nil
}
func (c *csiConnection) DeleteSnapshot(ctx context.Context, snapshotID string) (err error) {
client := csi.NewControllerClient(c.conn)
req := csi.DeleteSnapshotRequest{
SnapshotId: snapshotID,
DeleteSnapshotSecrets: nil,
}
if _, err := client.DeleteSnapshot(ctx, &req); err != nil {
return err
}
return nil
}
func (c *csiConnection) GetSnapshotStatus(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, int64, error) {
client := csi.NewControllerClient(c.conn)
req := csi.ListSnapshotsRequest{
SnapshotId: snapshotID,
}
rsp, err := client.ListSnapshots(ctx, &req)
if err != nil {
return nil, 0, err
}
if rsp.Entries == nil || len(rsp.Entries) == 0 {
return nil, 0, fmt.Errorf("can not find snapshot for snapshotID %s", snapshotID)
}
return rsp.Entries[0].Snapshot.Status, rsp.Entries[0].Snapshot.CreatedAt, nil
}
func (c *csiConnection) Close() error {
return c.conn.Close()
}
func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
glog.V(5).Infof("GRPC call: %s", method)
glog.V(5).Infof("GRPC request: %+v", req)
err := invoker(ctx, method, req, reply, cc, opts...)
glog.V(5).Infof("GRPC response: %+v", reply)
glog.V(5).Infof("GRPC error: %v", err)
return err
}

View File

@@ -0,0 +1,439 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"time"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
storageinformers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions/volumesnapshot/v1alpha1"
storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/util/goroutinemap"
)
type CSISnapshotController struct {
clientset clientset.Interface
client kubernetes.Interface
snapshotterName string
eventRecorder record.EventRecorder
snapshotQueue workqueue.RateLimitingInterface
contentQueue workqueue.RateLimitingInterface
snapshotLister storagelisters.VolumeSnapshotLister
snapshotListerSynced cache.InformerSynced
contentLister storagelisters.VolumeSnapshotContentLister
contentListerSynced cache.InformerSynced
classLister storagelisters.VolumeSnapshotClassLister
classListerSynced cache.InformerSynced
snapshotStore cache.Store
contentStore cache.Store
handler Handler
// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap
createSnapshotContentRetryCount int
createSnapshotContentInterval time.Duration
resyncPeriod time.Duration
}
// NewCSISnapshotController returns a new *CSISnapshotController
func NewCSISnapshotController(
clientset clientset.Interface,
client kubernetes.Interface,
snapshotterName string,
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
createSnapshotContentRetryCount int,
createSnapshotContentInterval time.Duration,
conn connection.CSIConnection,
timeout time.Duration,
resyncPeriod time.Duration,
snapshotNamePrefix string,
snapshotNameUUIDLength int,
) *CSISnapshotController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.Core().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", snapshotterName)})
ctrl := &CSISnapshotController{
clientset: clientset,
client: client,
snapshotterName: snapshotterName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(conn, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
runningOperations: goroutinemap.NewGoRoutineMap(true),
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
createSnapshotContentInterval: createSnapshotContentInterval,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueSnapshotWork(newObj) },
DeleteFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
},
ctrl.resyncPeriod,
)
ctrl.snapshotLister = volumeSnapshotInformer.Lister()
ctrl.snapshotListerSynced = volumeSnapshotInformer.Informer().HasSynced
volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueContentWork(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueContentWork(newObj) },
DeleteFunc: func(obj interface{}) { ctrl.enqueueContentWork(obj) },
},
ctrl.resyncPeriod,
)
ctrl.contentLister = volumeSnapshotContentInformer.Lister()
ctrl.contentListerSynced = volumeSnapshotContentInformer.Informer().HasSynced
ctrl.classLister = volumeSnapshotClassInformer.Lister()
ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced
return ctrl
}
func (ctrl *CSISnapshotController) Run(workers int, stopCh <-chan struct{}) {
defer ctrl.snapshotQueue.ShutDown()
defer ctrl.contentQueue.ShutDown()
glog.Infof("Starting CSI snapshotter")
defer glog.Infof("Shutting CSI snapshotter")
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced) {
glog.Errorf("Cannot sync caches")
return
}
ctrl.initializeCaches(ctrl.snapshotLister, ctrl.contentLister)
for i := 0; i < workers; i++ {
go wait.Until(ctrl.snapshotWorker, 0, stopCh)
go wait.Until(ctrl.contentWorker, 0, stopCh)
}
<-stopCh
}
// enqueueSnapshotWork adds snapshot to given work queue.
func (ctrl *CSISnapshotController) enqueueSnapshotWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if vs, ok := obj.(*crdv1.VolumeSnapshot); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(vs)
if err != nil {
glog.Errorf("failed to get key from object: %v, %v", err, vs)
return
}
glog.V(5).Infof("enqueued %q for sync", objName)
ctrl.snapshotQueue.Add(objName)
}
}
// enqueueContentWork adds snapshot data to given work queue.
func (ctrl *CSISnapshotController) enqueueContentWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if content, ok := obj.(*crdv1.VolumeSnapshotContent); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(content)
if err != nil {
glog.Errorf("failed to get key from object: %v, %v", err, content)
return
}
glog.V(5).Infof("enqueued %q for sync", objName)
ctrl.contentQueue.Add(objName)
}
}
// snapshotWorker processes items from snapshotQueue. It must run only once,
// syncSnapshot is not assured to be reentrant.
func (ctrl *CSISnapshotController) snapshotWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.snapshotQueue.Get()
if quit {
return true
}
defer ctrl.snapshotQueue.Done(keyObj)
key := keyObj.(string)
glog.V(5).Infof("snapshotWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
glog.V(5).Infof("snapshotWorker: snapshot namespace [%s] name [%s]", namespace, name)
if err != nil {
glog.V(4).Infof("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err)
return false
}
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err == nil {
if ctrl.shouldProcessSnapshot(snapshot) {
// The volume snapshot still exists in informer cache, the event must have
// been add/update/sync
glog.V(4).Infof("should process snapshot")
ctrl.updateSnapshot(snapshot)
}
return false
}
if err != nil && !errors.IsNotFound(err) {
glog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
return false
}
// The snapshot is not in informer cache, the event must have been "delete"
vsObj, found, err := ctrl.snapshotStore.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting snapshot %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the snapshot from its cache
glog.V(2).Infof("deletion of vs %q was already processed", key)
return false
}
snapshot, ok := vsObj.(*crdv1.VolumeSnapshot)
if !ok {
glog.Errorf("expected vs, got %+v", vsObj)
return false
}
if ctrl.shouldProcessSnapshot(snapshot) {
ctrl.deleteSnapshot(snapshot)
}
return false
}
for {
if quit := workFunc(); quit {
glog.Infof("snapshot worker queue shutting down")
return
}
}
}
// contentWorker processes items from contentQueue. It must run only once,
// syncContent is not assured to be reentrant.
func (ctrl *CSISnapshotController) contentWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.contentQueue.Get()
if quit {
return true
}
defer ctrl.contentQueue.Done(keyObj)
key := keyObj.(string)
glog.V(5).Infof("contentWorker[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting name of snapshotData %q to get snapshotData from informer: %v", key, err)
return false
}
content, err := ctrl.contentLister.Get(name)
if err == nil {
// The volume still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateContent(content)
return false
}
if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting content %q from informer: %v", key, err)
return false
}
// The content is not in informer cache, the event must have been
// "delete"
contentObj, found, err := ctrl.contentStore.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting content %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the volume from its cache
glog.V(2).Infof("deletion of content %q was already processed", key)
return false
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
glog.Errorf("expected content, got %+v", content)
return false
}
ctrl.deleteContent(content)
return false
}
for {
if quit := workFunc(); quit {
glog.Infof("content worker queue shutting down")
return
}
}
}
// shouldProcessSnapshot detect if snapshotter in the VolumeSnapshotClass is the same as the snapshotter
// in external controller.
func (ctrl *CSISnapshotController) shouldProcessSnapshot(snapshot *crdv1.VolumeSnapshot) bool {
class, err := ctrl.GetClassFromVolumeSnapshot(snapshot)
if err != nil {
return false
}
glog.V(5).Infof("VolumeSnapshotClass Snapshotter [%s] Snapshot Controller snapshotterName [%s]", class.Snapshotter, ctrl.snapshotterName)
if class.Snapshotter != ctrl.snapshotterName {
glog.V(4).Infof("Skipping VolumeSnapshot %s for snapshotter [%s] in VolumeSnapshotClass because it does not match with the snapshotter for controller [%s]", snapshotKey(snapshot), class.Snapshotter, ctrl.snapshotterName)
return false
}
return true
}
// updateSnapshot runs in worker thread and handles "snapshot added",
// "snapshot updated" and "periodic sync" events.
func (ctrl *CSISnapshotController) updateSnapshot(vs *crdv1.VolumeSnapshot) {
// Store the new vs version in the cache and do not process it if this is
// an old version.
glog.V(5).Infof("updateSnapshot %q", snapshotKey(vs))
newVS, err := ctrl.storeSnapshotUpdate(vs)
if err != nil {
glog.Errorf("%v", err)
}
if !newVS {
return
}
err = ctrl.syncSnapshot(vs)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
glog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(vs), err)
} else {
glog.Errorf("could not sync volume %q: %+v", snapshotKey(vs), err)
}
}
}
// updateContent runs in worker thread and handles "content added",
// "content updated" and "periodic sync" events.
func (ctrl *CSISnapshotController) updateContent(content *crdv1.VolumeSnapshotContent) {
// Store the new vs version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeContentUpdate(content)
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}
err = ctrl.syncContent(content)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
glog.V(3).Infof("could not sync content %q: %+v", content.Name, err)
} else {
glog.Errorf("could not sync content %q: %+v", content.Name, err)
}
}
}
// deleteSnapshot runs in worker thread and handles "snapshot deleted" event.
func (ctrl *CSISnapshotController) deleteSnapshot(vs *crdv1.VolumeSnapshot) {
_ = ctrl.snapshotStore.Delete(vs)
glog.V(4).Infof("vs %q deleted", snapshotKey(vs))
snapshotContentName := vs.Spec.SnapshotContentName
if snapshotContentName == "" {
glog.V(5).Infof("deleteSnapshot[%q]: content not bound", snapshotKey(vs))
return
}
// sync the content when its vs is deleted. Explicitly sync'ing the
// content here in response to vs deletion prevents the content from
// waiting until the next sync period for its Release.
glog.V(5).Infof("deleteSnapshot[%q]: scheduling sync of content %s", snapshotKey(vs), snapshotContentName)
ctrl.contentQueue.Add(snapshotContentName)
}
// deleteContent runs in worker thread and handles "snapshot deleted" event.
func (ctrl *CSISnapshotController) deleteContent(content *crdv1.VolumeSnapshotContent) {
_ = ctrl.contentStore.Delete(content)
glog.V(4).Infof("content %q deleted", content.Name)
snapshotName := snapshotRefKey(content.Spec.VolumeSnapshotRef)
if snapshotName == "" {
glog.V(5).Infof("deleteContent[%q]: content not bound", content.Name)
return
}
// sync the vs when its vs is deleted. Explicitly sync'ing the
// vs here in response to content deletion prevents the vs from
// waiting until the next sync period for its Release.
glog.V(5).Infof("deleteContent[%q]: scheduling sync of vs %s", content.Name, snapshotName)
ctrl.contentQueue.Add(snapshotName)
}
// initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addSnapshot/addContent to
// perform initial synchronization of the controller.
func (ctrl *CSISnapshotController) initializeCaches(snapshotLister storagelisters.VolumeSnapshotLister, contentLister storagelisters.VolumeSnapshotContentLister) {
vsList, err := snapshotLister.List(labels.Everything())
if err != nil {
glog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, vs := range vsList {
vsClone := vs.DeepCopy()
if _, err = ctrl.storeSnapshotUpdate(vsClone); err != nil {
glog.Errorf("error updating volume snapshot cache: %v", err)
}
}
contentList, err := contentLister.List(labels.Everything())
if err != nil {
glog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, content := range contentList {
contentClone := content.DeepCopy()
if _, err = ctrl.storeSnapshotUpdate(contentClone); err != nil {
glog.Errorf("error updating volume snapshot cache: %v", err)
}
}
glog.V(4).Infof("controller initialized")
}

View File

@@ -0,0 +1,115 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"strings"
"time"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
"k8s.io/api/core/v1"
)
// Handler is responsible for handling VolumeSnapshot events from informer.
type Handler interface {
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string) (string, string, int64, *csi.SnapshotStatus, error)
DeleteSnapshot(content *crdv1.VolumeSnapshotContent) error
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (*csi.SnapshotStatus, int64, error)
}
// csiHandler is a handler that calls CSI to create/delete volume snapshot.
type csiHandler struct {
csiConnection connection.CSIConnection
timeout time.Duration
snapshotNamePrefix string
snapshotNameUUIDLength int
}
func NewCSIHandler(
csiConnection connection.CSIConnection,
timeout time.Duration,
snapshotNamePrefix string,
snapshotNameUUIDLength int,
) Handler {
return &csiHandler{
csiConnection: csiConnection,
timeout: timeout,
snapshotNamePrefix: snapshotNamePrefix,
snapshotNameUUIDLength: snapshotNameUUIDLength,
}
}
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string) (string, string, int64, *csi.SnapshotStatus, error) {
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength)
if err != nil {
return "", "", 0, nil, err
}
return handler.csiConnection.CreateSnapshot(ctx, snapshotName, snapshot, volume, parameters)
}
func (handler *csiHandler) DeleteSnapshot(content *crdv1.VolumeSnapshotContent) error {
if content.Spec.CSI == nil {
return fmt.Errorf("CSISnapshot not defined in spec")
}
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
err := handler.csiConnection.DeleteSnapshot(ctx, content.Spec.CSI.SnapshotHandle)
if err != nil {
return fmt.Errorf("failed to delete snapshot data %s: %q", content.Name, err)
}
return nil
}
func (handler *csiHandler) GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (*csi.SnapshotStatus, int64, error) {
if content.Spec.CSI == nil {
return nil, 0, fmt.Errorf("CSISnapshot not defined in spec")
}
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
csiSnapshotStatus, timestamp, err := handler.csiConnection.GetSnapshotStatus(ctx, content.Spec.CSI.SnapshotHandle)
if err != nil {
return nil, 0, fmt.Errorf("failed to list snapshot data %s: %q", content.Name, err)
}
return csiSnapshotStatus, timestamp, nil
}
func makeSnapshotName(prefix, snapshotUID string, snapshotNameUUIDLength int) (string, error) {
// create persistent name based on a volumeNamePrefix and volumeNameUUIDLength
// of PVC's UID
if len(prefix) == 0 {
return "", fmt.Errorf("Snapshot name prefix cannot be of length 0")
}
if len(snapshotUID) == 0 {
return "", fmt.Errorf("Corrupted snapshot object, it is missing UID")
}
if snapshotNameUUIDLength == -1 {
// Default behavior is to not truncate or remove dashes
return fmt.Sprintf("%s-%s", prefix, snapshotUID), nil
}
return fmt.Sprintf("%s-%s", prefix, strings.Replace(snapshotUID, "-", "", -1)[0:snapshotNameUUIDLength]), nil
}

View File

@@ -0,0 +1,740 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"time"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
)
// ==================================================================
// PLEASE DO NOT ATTEMPT TO SIMPLIFY THIS CODE.
// KEEP THE SPACE SHUTTLE FLYING.
// ==================================================================
// Design:
//
// The fundamental key to this design is the bi-directional "pointer" between
// VolumeSnapshots and VolumeSnapshotContents, which is represented here
// as snapshot.Spec.SnapshotContentName and content.Spec.VolumeSnapshotRef.
// The bi-directionality is complicated to manage in a transactionless system, but
// without it we can't ensure sane behavior in the face of different forms of
// trouble. For example, a rogue HA controller instance could end up racing
// and making multiple bindings that are indistinguishable, resulting in
// potential data loss.
//
// This controller is designed to work in active-passive high availability
// mode. It *could* work also in active-active HA mode, all the object
// transitions are designed to cope with this, however performance could be
// lower as these two active controllers will step on each other toes
// frequently.
//
// This controller supports both dynamic snapshot creation and pre-bound snapshot.
// In pre-bound mode, objects are created with pre-defined pointers: a VolumeSnapshot
// points to a specific VolumeSnapshotContent and the VolumeSnapshotContent also
// points back for this VolumeSnapshot.
//
// The dynamic snapshot creation is multi-step process: first controller triggers
// snapshot creation though csi volume plugin which should return a snapshot after
// it is created succesfully (however, the snapshot might not be ready to use yet if
// there is an uploading phase). The creationTimestamp will be updated according to
// VolumeSnapshot, and then a VolumeSnapshotContent object is created to represent
// this snapshot. After that, the controller will keep checking the snapshot status
// though csi snapshot calls. When the snapshot is ready to use, the controller set
// the status "Bound" to true to indicate the snapshot is bound and ready to use.
// If the createtion failed for any reason, the Error status is set accordingly.
// In alpha version, the controller not retry to create the snapshot after it failed.
// In the future version, a retry policy will be added.
const pvcKind = "PersistentVolumeClaim"
const IsDefaultSnapshotClassAnnotation = "snapshot.storage.kubernetes.io/is-default-class"
// syncContent deals with one key off the queue. It returns false when it's time to quit.
func (ctrl *CSISnapshotController) syncContent(content *crdv1.VolumeSnapshotContent) error {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]", content.Name)
// VolumeSnapshotContent is not bind to any VolumeSnapshot, this case rare and we just return err
if content.Spec.VolumeSnapshotRef == nil {
// content is not bind
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: VolumeSnapshotContent is not bound to any VolumeSnapshot", content.Name)
return fmt.Errorf("volumeSnapshotContent %s is not bound to any VolumeSnapshot", content.Name)
} else {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content is bound to snapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// The VolumeSnapshotContent is reserved for a VolumeSNapshot;
// that VolumeSnapshot has not yet been bound to this VolumeSnapshotCent; the VolumeSnapshot sync will handle it.
if content.Spec.VolumeSnapshotRef.UID == "" {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: VolumeSnapshotContent is pre-bound to VolumeSnapshot %s", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
return nil
}
// Get the VolumeSnapshot by _name_
var vs *crdv1.VolumeSnapshot
vsName := snapshotRefKey(content.Spec.VolumeSnapshotRef)
obj, found, err := ctrl.snapshotStore.GetByKey(vsName)
if err != nil {
return err
}
if !found {
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: vs %s not found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Fall through with vs = nil
} else {
var ok bool
vs, ok = obj.(*crdv1.VolumeSnapshot)
if !ok {
return fmt.Errorf("cannot convert object from vs cache to vs %q!?: %#v", content.Name, obj)
}
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: vs %s found", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
}
if vs != nil && vs.UID != content.Spec.VolumeSnapshotRef.UID {
// The vs that the content was pointing to was deleted, and another
// with the same name created.
glog.V(4).Infof("synchronizing VolumeSnapshotContent[%s]: content %s has different UID, the old one must have been deleted", content.Name, snapshotRefKey(content.Spec.VolumeSnapshotRef))
// Treat the volume as bound to a missing claim.
vs = nil
}
if vs == nil {
ctrl.deleteSnapshotContent(content)
}
}
return nil
}
// syncSnapshot is the main controller method to decide what to do with a snapshot.
// It's invoked by appropriate cache.Controller callbacks when a snapshot is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it is split into syncCompleteSnapshot and syncUncompleteSnapshot
func (ctrl *CSISnapshotController) syncSnapshot(snapshot *crdv1.VolumeSnapshot) error {
glog.V(4).Infof("synchonizing VolumeSnapshot[%s]: %s", snapshotKey(snapshot), getSnapshotStatusForLogging(snapshot))
if !snapshot.Status.Ready {
return ctrl.syncUnboundSnapshot(snapshot)
} else {
return ctrl.syncBoundSnapshot(snapshot)
}
}
// syncCompleteSnapshot checks the snapshot which has been bound to snapshot content succesfully before.
// If there is any problem with the binding (e.g., snapshot points to a non-exist snapshot content), update the snapshot status and emit event.
func (ctrl *CSISnapshotController) syncBoundSnapshot(snapshot *crdv1.VolumeSnapshot) error {
if snapshot.Spec.SnapshotContentName == "" {
if _, err := ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", "Bound snapshot has lost reference to VolumeSnapshotContent"); err != nil {
return err
}
return nil
}
obj, found, err := ctrl.contentStore.GetByKey(snapshot.Spec.SnapshotContentName)
if err != nil {
return err
}
if !found {
if _, err = ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", "Bound snapshot has lost reference to VolumeSnapshotContent"); err != nil {
return err
}
return nil
} else {
content, ok := obj.(*crdv1.VolumeSnapshotContent)
if !ok {
return fmt.Errorf("Cannot convert object from snapshot content store to VolumeSnapshotContent %q!?: %#v", snapshot.Spec.SnapshotContentName, obj)
}
glog.V(4).Infof("syncCompleteSnapshot[%s]: VolumeSnapshotContent %q found", snapshotKey(snapshot), content.Name)
if !IsSnapshotBound(snapshot, content) {
// snapshot is bound but content is not bound to snapshot correctly
if _, err = ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotMisbound", "VolumeSnapshotContent is not bound to the VolumeSnapshot correctly"); err != nil {
return err
}
return nil
}
// Snapshot is correctly bound.
if _, err = ctrl.updateSnapshotBoundWithEvent(snapshot, v1.EventTypeNormal, "SnapshotBound", "Snapshot is bound to its VolumeSnapshotContent"); err != nil {
return err
}
return nil
}
}
func (ctrl *CSISnapshotController) syncUnboundSnapshot(snapshot *crdv1.VolumeSnapshot) error {
uniqueSnapshotName := snapshotKey(snapshot)
glog.V(4).Infof("syncSnapshot %s", uniqueSnapshotName)
// Snsapshot has errors during its creation. Controller will not try to fix it. Nothing to do.
if snapshot.Status.Error != nil {
//return nil
}
if snapshot.Spec.SnapshotContentName != "" {
contentObj, found, err := ctrl.contentStore.GetByKey(snapshot.Spec.SnapshotContentName)
if err != nil {
return err
}
if !found {
// snapshot is bound to a non-existing content.
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotLost", fmt.Sprintf("Snapshot has lost reference to VolumeSnapshotContent, %v", err))
return fmt.Errorf("snapshot %s is bound to a non-existing content %s", uniqueSnapshotName, snapshot.Spec.SnapshotContentName)
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
return fmt.Errorf("expected volume snapshot content, got %+v", contentObj)
}
if err := ctrl.bindSnapshotContent(snapshot, content); err != nil {
// snapshot is bound but content is not bound to snapshot correctly
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotBoundFailed", fmt.Sprintf("Snapshot failed to bind VolumeSnapshotContent, %v", err))
return fmt.Errorf("snapshot %s is bound, but VolumeSnapshotContent %s is not bound to the VolumeSnapshot correctly, %v", uniqueSnapshotName, content.Name, err)
}
// snapshot is already bound correctly, check the status and update if it is ready.
glog.V(4).Infof("Check and update snapshot %s status", uniqueSnapshotName)
if err = ctrl.checkandUpdateSnapshotStatus(snapshot, content); err != nil {
return err
}
return nil
} else { // snapshot.Spec.SnapshotContentName == nil
if ContentObj := ctrl.getMatchSnapshotContent(snapshot); ContentObj != nil {
glog.V(4).Infof("Find VolumeSnapshotContent object %s for snapshot %s", ContentObj.Name, uniqueSnapshotName)
newSnapshot, err := ctrl.bindandUpdateVolumeSnapshot(ContentObj, snapshot)
if err != nil {
return err
}
glog.V(4).Infof("bindandUpdateVolumeSnapshot %v", newSnapshot)
return nil
} else if snapshot.Status.Error == nil { // Try to create snapshot if no error status is set
if err := ctrl.createSnapshot(snapshot); err != nil {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot with error %v", err))
return err
}
return nil
}
return nil
}
}
// getMatchSnapshotContent looks up VolumeSnapshotContent for a VolumeSnapshot named snapshotName
func (ctrl *CSISnapshotController) getMatchSnapshotContent(vs *crdv1.VolumeSnapshot) *crdv1.VolumeSnapshotContent {
var snapshotDataObj *crdv1.VolumeSnapshotContent
var found bool
objs := ctrl.contentStore.List()
for _, obj := range objs {
content := obj.(*crdv1.VolumeSnapshotContent)
if content.Spec.VolumeSnapshotRef != nil &&
content.Spec.VolumeSnapshotRef.Name == vs.Name &&
content.Spec.VolumeSnapshotRef.Namespace == vs.Namespace &&
content.Spec.VolumeSnapshotRef.UID == vs.UID {
found = true
snapshotDataObj = content
break
}
}
if !found {
glog.V(4).Infof("No VolumeSnapshotContent for VolumeSnapshot %s found", snapshotKey(vs))
return nil
}
return snapshotDataObj
}
// deleteSnapshotContent starts delete action.
func (ctrl *CSISnapshotController) deleteSnapshotContent(content *crdv1.VolumeSnapshotContent) {
operationName := fmt.Sprintf("delete-%s[%s]", content.Name, string(content.UID))
glog.V(4).Infof("Snapshotter is about to delete volume snapshot and the operation named %s", operationName)
ctrl.scheduleOperation(operationName, func() error {
return ctrl.DeleteSnapshotContentOperation(content)
})
}
// scheduleOperation starts given asynchronous operation on given volume. It
// makes sure the operation is already not running.
func (ctrl *CSISnapshotController) scheduleOperation(operationName string, operation func() error) {
glog.V(4).Infof("scheduleOperation[%s]", operationName)
err := ctrl.runningOperations.Run(operationName, operation)
if err != nil {
switch {
case goroutinemap.IsAlreadyExists(err):
glog.V(4).Infof("operation %q is already running, skipping", operationName)
case exponentialbackoff.IsExponentialBackoff(err):
glog.V(4).Infof("operation %q postponed due to exponential backoff", operationName)
default:
glog.Errorf("error scheduling operation %q: %v", operationName, err)
}
}
}
func (ctrl *CSISnapshotController) storeSnapshotUpdate(vs interface{}) (bool, error) {
return storeObjectUpdate(ctrl.snapshotStore, vs, "vs")
}
func (ctrl *CSISnapshotController) storeContentUpdate(content interface{}) (bool, error) {
return storeObjectUpdate(ctrl.contentStore, content, "content")
}
// createSnapshot starts new asynchronous operation to create snapshot data for snapshot
func (ctrl *CSISnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot) error {
glog.V(4).Infof("createSnapshot[%s]: started", snapshotKey(snapshot))
opName := fmt.Sprintf("create-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
ctrl.scheduleOperation(opName, func() error {
snapshotObj, err := ctrl.createSnapshotOperation(snapshot)
if err != nil {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
glog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)
return err
}
_, updateErr := ctrl.storeSnapshotUpdate(snapshotObj)
if updateErr != nil {
// We will get an "snapshot update" event soon, this is not a big error
glog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
}
return nil
})
return nil
}
func (ctrl *CSISnapshotController) checkandUpdateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) error {
glog.V(5).Infof("checkandUpdateSnapshotStatus[%s] started", snapshotKey(snapshot))
opName := fmt.Sprintf("check-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
ctrl.scheduleOperation(opName, func() error {
snapshotObj, err := ctrl.checkandUpdateSnapshotStatusOperation(snapshot, content)
if err != nil {
glog.Errorf("checkandUpdateSnapshotStatus [%s]: error occured %v", snapshotKey(snapshot), err)
return err
}
_, updateErr := ctrl.storeSnapshotUpdate(snapshotObj)
if updateErr != nil {
// We will get an "snapshot update" event soon, this is not a big error
glog.V(4).Infof("checkandUpdateSnapshotStatus [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
}
return nil
})
return nil
}
// updateSnapshotStatusWithEvent saves new snapshot.Status to API server and emits
// given event on the snapshot. It saves the status and emits the event only when
// the status has actually changed from the version saved in API server.
// Parameters:
// snapshot - snapshot to update
// eventtype, reason, message - event to send, see EventRecorder.Event()
func (ctrl *CSISnapshotController) updateSnapshotErrorStatusWithEvent(snapshot *crdv1.VolumeSnapshot, eventtype, reason, message string) (*crdv1.VolumeSnapshot, error) {
glog.V(4).Infof("updateSnapshotStatusWithEvent[%s]", snapshotKey(snapshot))
if snapshot.Status.Error != nil && snapshot.Status.Ready == false {
glog.V(4).Infof("updateClaimStatusWithEvent[%s]: error %v already set", snapshot.Name, snapshot.Status.Error)
return snapshot, nil
}
snapshotClone := snapshot.DeepCopy()
if snapshot.Status.Error == nil {
statusError := &storage.VolumeError{
Time: metav1.Time{
Time: time.Now(),
},
Message: message,
}
snapshotClone.Status.Error = statusError
}
snapshotClone.Status.Ready = false
newSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status failed %v", snapshotKey(snapshot), err)
return newSnapshot, err
}
_, err = ctrl.storeSnapshotUpdate(newSnapshot)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status: cannot update internal cache %v", snapshotKey(snapshot), err)
return newSnapshot, err
}
// Emit the event only when the status change happens
ctrl.eventRecorder.Event(newSnapshot, eventtype, reason, message)
return newSnapshot, nil
}
func (ctrl *CSISnapshotController) updateSnapshotBoundWithEvent(snapshot *crdv1.VolumeSnapshot, eventtype, reason, message string) (*crdv1.VolumeSnapshot, error) {
glog.V(4).Infof("updateSnapshotBoundWithEvent[%s]", snapshotKey(snapshot))
if snapshot.Status.Ready && snapshot.Status.Error == nil {
// Nothing to do.
glog.V(4).Infof("updateSnapshotBoundWithEvent[%s]: Ready %v already set", snapshotKey(snapshot), snapshot.Status.Ready)
return snapshot, nil
}
snapshotClone := snapshot.DeepCopy()
snapshotClone.Status.Ready = true
snapshotClone.Status.Error = nil
newSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status failed %v", snapshotKey(snapshot), err)
return newSnapshot, err
}
// Emit the event only when the status change happens
ctrl.eventRecorder.Event(snapshot, eventtype, reason, message)
_, err = ctrl.storeSnapshotUpdate(newSnapshot)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshot[%s] error status: cannot update internal cache %v", snapshotKey(snapshot), err)
return newSnapshot, err
}
return newSnapshot, nil
}
// Stateless functions
func getSnapshotStatusForLogging(snapshot *crdv1.VolumeSnapshot) string {
return fmt.Sprintf("bound to: %q, Completed: %v", snapshot.Spec.SnapshotContentName, snapshot.Status.Ready)
}
func IsSnapshotBound(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) bool {
if content.Spec.VolumeSnapshotRef != nil && content.Spec.VolumeSnapshotRef.Name == snapshot.Name &&
content.Spec.VolumeSnapshotRef.UID == snapshot.UID {
return true
}
return false
}
func (ctrl *CSISnapshotController) bindSnapshotContent(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) error {
if content.Spec.VolumeSnapshotRef == nil || content.Spec.VolumeSnapshotRef.Name != snapshot.Name {
return fmt.Errorf("Could not bind snapshot %s and content %s, the VolumeSnapshotRef does not match", snapshot.Name, content.Name)
} else if content.Spec.VolumeSnapshotRef.UID != "" && content.Spec.VolumeSnapshotRef.UID != snapshot.UID {
return fmt.Errorf("Could not bind snapshot %s and content %s, the VolumeSnapshotRef does not match", snapshot.Name, content.Name)
} else if content.Spec.VolumeSnapshotRef.UID == "" {
contentClone := content.DeepCopy()
contentClone.Spec.VolumeSnapshotRef.UID = snapshot.UID
newContent, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotContents().Update(contentClone)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshotContent[%s] error status failed %v", newContent.Name, err)
return err
}
_, err = ctrl.storeContentUpdate(newContent)
if err != nil {
glog.V(4).Infof("updating VolumeSnapshotContent[%s] error status: cannot update internal cache %v", newContent.Name, err)
return err
}
}
return nil
}
func (ctrl *CSISnapshotController) checkandUpdateSnapshotStatusOperation(snapshot *crdv1.VolumeSnapshot, content *crdv1.VolumeSnapshotContent) (*crdv1.VolumeSnapshot, error) {
status, _, err := ctrl.handler.GetSnapshotStatus(content)
if err != nil {
return nil, fmt.Errorf("failed to check snapshot status %s with error %v", snapshot.Name, err)
}
newSnapshot, err := ctrl.updateSnapshotStatus(snapshot, status, time.Now(), IsSnapshotBound(snapshot, content))
if err != nil {
return nil, err
}
return newSnapshot, nil
}
// The function goes through the whole snapshot creation process.
// 1. Trigger the snapshot through csi storage provider.
// 2. Update VolumeSnapshot status with creationtimestamp information
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
func (ctrl *CSISnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
glog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", snapshotKey(snapshot))
class, err := ctrl.GetClassFromVolumeSnapshot(snapshot)
if err != nil {
glog.Errorf("createSnapshotOperation failed to getClassFromVolumeSnapshot %s", err)
return nil, err
}
volume, err := ctrl.getVolumeFromVolumeSnapshot(snapshot)
if err != nil {
glog.Errorf("createSnapshotOperation failed to get PersistentVolume object [%s]: Error: [%#v]", snapshot.Name, err)
return nil, err
}
driverName, snapshotID, timestamp, csiSnapshotStatus, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters)
if err != nil {
return nil, fmt.Errorf("Failed to take snapshot of the volume, %s: %q", volume.Name, err)
}
glog.Infof("Create snapshot driver %s, snapshotId %s, timestamp %d, csiSnapshotStatus %v", driverName, snapshotID, timestamp, csiSnapshotStatus)
// Update snapshot status with timestamp
newSnapshot, err := ctrl.updateSnapshotStatus(snapshot, csiSnapshotStatus, time.Unix(0, timestamp), false)
if err != nil {
return nil, err
}
// Create VolumeSnapshotContent in the database
contentName := GetSnapshotContentNameForSnapshot(snapshot)
volumeRef, err := ref.GetReference(scheme.Scheme, volume)
snapshotContent := &crdv1.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: contentName,
},
Spec: crdv1.VolumeSnapshotContentSpec{
VolumeSnapshotRef: &v1.ObjectReference{
Kind: "VolumeSnapshot",
Namespace: snapshot.Namespace,
Name: snapshot.Name,
UID: snapshot.UID,
APIVersion: "v1alpha1",
},
PersistentVolumeRef: volumeRef,
VolumeSnapshotSource: crdv1.VolumeSnapshotSource{
CSI: &crdv1.CSIVolumeSnapshotSource{
Driver: driverName,
SnapshotHandle: snapshotID,
CreatedAt: timestamp,
},
},
},
}
// Try to create the VolumeSnapshotContent object several times
for i := 0; i < ctrl.createSnapshotContentRetryCount; i++ {
glog.V(4).Infof("createSnapshot [%s]: trying to save volume snapshot data %s", snapshotKey(snapshot), snapshotContent.Name)
if _, err = ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotContents().Create(snapshotContent); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.V(3).Infof("volume snapshot data %q for snapshot %q already exists, reusing", snapshotContent.Name, snapshotKey(snapshot))
err = nil
} else {
glog.V(3).Infof("volume snapshot data %q for snapshot %q saved", snapshotContent.Name, snapshotKey(snapshot))
}
break
}
// Save failed, try again after a while.
glog.V(3).Infof("failed to save volume snapshot data %q for snapshot %q: %v", snapshotContent.Name, snapshotKey(snapshot), err)
time.Sleep(ctrl.createSnapshotContentInterval)
}
if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate volumesnapshotdata object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating volume snapshot data object for snapshot %s: %v.", snapshotKey(snapshot), err)
glog.Error(strerr)
ctrl.eventRecorder.Event(newSnapshot, v1.EventTypeWarning, "CreateSnapshotContentFailed", strerr)
return nil, err
}
// save succeeded, bind and update status for snapshot.
result, err := ctrl.bindandUpdateVolumeSnapshot(snapshotContent, newSnapshot)
if err != nil {
return nil, err
}
return result, nil
}
// Delete a snapshot
// 1. Find the SnapshotContent corresponding to Snapshot
// 1a: Not found => finish (it's been deleted already)
// 2. Ask the backend to remove the snapshot device
// 3. Delete the SnapshotContent object
// 4. Remove the Snapshot from vsStore
// 5. Finish
func (ctrl *CSISnapshotController) DeleteSnapshotContentOperation(content *crdv1.VolumeSnapshotContent) error {
glog.V(4).Infof("deleteSnapshotOperation [%s] started", content.Name)
err := ctrl.handler.DeleteSnapshot(content)
if err != nil {
return fmt.Errorf("failed to delete snapshot %#v, err: %v", content.Name, err)
}
err = ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotContents().Delete(content.Name, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete VolumeSnapshotContent %s from API server: %q", content.Name, err)
}
return nil
}
func (ctrl *CSISnapshotController) bindandUpdateVolumeSnapshot(snapshotContent *crdv1.VolumeSnapshotContent, snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
glog.V(4).Infof("bindandUpdateVolumeSnapshot for snapshot [%s]: snapshotContent [%s]", snapshot.Name, snapshotContent.Name)
snapshotObj, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Get(snapshot.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get snapshot %s from api server: %v", snapshotKey(snapshot), err)
}
// Copy the snapshot object before updating it
snapshotCopy := snapshotObj.DeepCopy()
if snapshotObj.Spec.SnapshotContentName == snapshotContent.Name {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: VolumeSnapshot %s already bind to volumeSnapshotContent [%s]", snapshot.Name, snapshotContent.Name)
} else {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: before bind VolumeSnapshot %s to volumeSnapshotContent [%s]", snapshot.Name, snapshotContent.Name)
snapshotCopy.Spec.SnapshotContentName = snapshotContent.Name
updateSnapshot, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotCopy)
if err != nil {
glog.Infof("bindVolumeSnapshotContentToVolumeSnapshot: Error binding VolumeSnapshot %s to volumeSnapshotContent [%s]. Error [%#v]", snapshot.Name, snapshotContent.Name, err)
return nil, fmt.Errorf("error updating snapshot object %s on the API server: %v", snapshotKey(updateSnapshot), err)
}
snapshotCopy = updateSnapshot
_, err = ctrl.storeSnapshotUpdate(snapshotCopy)
if err != nil {
glog.Errorf("%v", err)
}
}
glog.V(5).Infof("bindandUpdateVolumeSnapshot for snapshot completed [%#v]", snapshotCopy)
return snapshotCopy, nil
}
// UpdateSnapshotStatus converts snapshot status to crdv1.VolumeSnapshotCondition
func (ctrl *CSISnapshotController) updateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, csistatus *csi.SnapshotStatus, timestamp time.Time, bound bool) (*crdv1.VolumeSnapshot, error) {
glog.V(4).Infof("updating VolumeSnapshot[]%s, set status %v, timestamp %v", snapshotKey(snapshot), csistatus, timestamp)
status := snapshot.Status
change := false
timeAt := &metav1.Time{
Time: timestamp,
}
snapshotClone := snapshot.DeepCopy()
switch csistatus.Type {
case csi.SnapshotStatus_READY:
if bound {
status.Ready = true
change = true
}
if status.CreationTime == nil {
status.CreationTime = timeAt
change = true
}
case csi.SnapshotStatus_ERROR_UPLOADING:
if status.Error == nil {
status.Error = &storage.VolumeError{
Time: *timeAt,
Message: "Failed to upload the snapshot",
}
change = true
}
case csi.SnapshotStatus_UPLOADING:
if status.CreationTime == nil {
status.CreationTime = timeAt
change = true
}
}
if change {
snapshotClone.Status = status
newSnapshotObj, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshotClone.Namespace).Update(snapshotClone)
if err != nil {
return nil, fmt.Errorf("error update status for volume snapshot %s: %s", snapshotKey(snapshot), err)
} else {
return newSnapshotObj, nil
}
}
return snapshot, nil
}
// getVolumeFromVolumeSnapshot is a helper function to get PV from VolumeSnapshot.
func (ctrl *CSISnapshotController) getVolumeFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*v1.PersistentVolume, error) {
pvc, err := ctrl.getClaimFromVolumeSnapshot(snapshot)
if err != nil {
return nil, err
}
pvName := pvc.Spec.VolumeName
pv, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to retrieve PV %s from the API server: %q", pvName, err)
}
glog.V(5).Infof("getVolumeFromVolumeSnapshot: snapshot [%s] PV name [%s]", snapshot.Name, pvName)
return pv, nil
}
// GetClassFromVolumeSnapshot is a helper function to get storage class from VolumeSnapshot.
func (ctrl *CSISnapshotController) GetClassFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshotClass, error) {
className := snapshot.Spec.VolumeSnapshotClassName
glog.V(5).Infof("getClassFromVolumeSnapshot [%s]: VolumeSnapshotClassName [%s]", snapshot.Name, className)
if len(className) > 0 {
class, err := ctrl.clientset.VolumesnapshotV1alpha1().VolumeSnapshotClasses().Get(className, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to retrieve storage class %s from the API server: %q", className, err)
//return nil, fmt.Errorf("failed to retrieve storage class %s from the API server: %q", className, err)
}
glog.V(5).Infof("getClassFromVolumeSnapshot [%s]: VolumeSnapshotClassName [%s]", snapshot.Name, className)
return class, nil
} else {
// Find default snapshot class if available
list, err := ctrl.classLister.List(labels.Everything())
if err != nil {
return nil, err
}
pvc, err := ctrl.getClaimFromVolumeSnapshot(snapshot)
if err != nil {
return nil, err
}
storageclass, err := ctrl.client.StorageV1().StorageClasses().Get(*pvc.Spec.StorageClassName, metav1.GetOptions{})
if err != nil {
return nil, err
}
defaultClasses := []*crdv1.VolumeSnapshotClass{}
for _, class := range list {
if IsDefaultAnnotation(class.ObjectMeta) && storageclass.Provisioner == class.Snapshotter {
defaultClasses = append(defaultClasses, class)
glog.V(4).Infof("getDefaultClass added: %s", class.Name)
}
}
if len(defaultClasses) == 0 {
return nil, nil
}
if len(defaultClasses) > 1 {
glog.V(4).Infof("getDefaultClass %d defaults found", len(defaultClasses))
return nil, fmt.Errorf("%d default StorageClasses were found", len(defaultClasses))
}
glog.V(5).Infof("getClassFromVolumeSnapshot [%s]: default VolumeSnapshotClassName [%s]", snapshot.Name, defaultClasses[0])
return defaultClasses[0], nil
}
}
// getClaimFromVolumeSnapshot is a helper function to get PV from VolumeSnapshot.
func (ctrl *CSISnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*v1.PersistentVolumeClaim, error) {
if snapshot.Spec.Source == nil || snapshot.Spec.Source.Kind != pvcKind {
return nil, fmt.Errorf("The snapshot source is not the right type. Expected %s, Got %v", pvcKind, snapshot.Spec.Source)
}
pvcName := snapshot.Spec.Source.Name
if pvcName == "" {
return nil, fmt.Errorf("the PVC name is not specified in snapshot %s", snapshotKey(snapshot))
}
pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err)
}
if pvc.Status.Phase != v1.ClaimBound {
return nil, fmt.Errorf("the PVC %s not yet bound to a PV, will not attempt to take a snapshot yet", pvcName)
}
return pvc, nil
}

123
pkg/controller/util.go Normal file
View File

@@ -0,0 +1,123 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"strconv"
"strings"
)
var (
keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)
// GetNameAndNameSpaceFromSnapshotName retrieves the namespace and
// the short name of a snapshot from its full name
func GetNameAndNameSpaceFromSnapshotName(name string) (string, string, error) {
strs := strings.Split(name, "/")
if len(strs) != 2 {
return "", "", fmt.Errorf("invalid snapshot name")
}
return strs[0], strs[1], nil
}
func snapshotKey(vs *crdv1.VolumeSnapshot) string {
return fmt.Sprintf("%s/%s", vs.Namespace, vs.Name)
}
func snapshotRefKey(vsref *v1.ObjectReference) string {
return fmt.Sprintf("%s/%s", vsref.Namespace, vsref.Name)
}
// storeObjectUpdate updates given cache with a new object version from Informer
// callback (i.e. with events from etcd) or with an object modified by the
// controller itself. Returns "true", if the cache was updated, false if the
// object is an old version and should be ignored.
func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
objName, err := keyFunc(obj)
if err != nil {
return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
oldObj, found, err := store.Get(obj)
if err != nil {
return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
}
objAccessor, err := meta.Accessor(obj)
if err != nil {
return false, err
}
if !found {
// This is a new object
glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
if err = store.Add(obj); err != nil {
return false, fmt.Errorf("error adding %s %q to controller cache: %v", className, objName, err)
}
return true, nil
}
oldObjAccessor, err := meta.Accessor(oldObj)
if err != nil {
return false, err
}
objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return false, fmt.Errorf("error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err)
}
oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return false, fmt.Errorf("error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err)
}
// Throw away only older version, let the same version pass - we do want to
// get periodic sync events.
if oldObjResourceVersion > objResourceVersion {
glog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion())
return false, nil
}
glog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion())
if err = store.Update(obj); err != nil {
return false, fmt.Errorf("error updating %s %q in controller cache: %v", className, objName, err)
}
return true, nil
}
// GetSnapshotContentNameForSnapshot returns SnapshotData.Name for the create VolumeSnapshotContent.
// The name must be unique.
func GetSnapshotContentNameForSnapshot(snapshot *crdv1.VolumeSnapshot) string {
return "snapdata-" + string(snapshot.UID)
}
// IsDefaultAnnotation returns a boolean if
// the annotation is set
func IsDefaultAnnotation(obj metav1.ObjectMeta) bool {
if obj.Annotations[IsDefaultSnapshotClassAnnotation] == "true" {
return true
}
return false
}