Merge pull request #826 from RaunakShah/group-create

Create Group Snapshot functionality for volume group snapshots
This commit is contained in:
Kubernetes Prow Robot
2023-05-03 11:06:17 -07:00
committed by GitHub
19 changed files with 3661 additions and 399 deletions

View File

@@ -326,7 +326,7 @@ type VolumeGroupSnapshotContentStatus struct {
// for this group snapshot.
// The maximum number of allowed snapshots in the group is 100.
// +optional
VolumeSnapshotContentRefList []core_v1.ObjectReference `json:"volumeSnapshotRefList,omitempty" protobuf:"bytes,5,opt,name=volumeSnapshotRefList"`
VolumeSnapshotContentRefList []core_v1.ObjectReference `json:"volumeSnapshotContentRefList,omitempty" protobuf:"bytes,5,opt,name=volumeSnapshotContentRefList"`
}
// VolumeGroupSnapshotContentSource represents the CSI source of a group snapshot.

View File

@@ -20,6 +20,7 @@ import (
"context"
"flag"
"fmt"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
"net/http"
"os"
"os/signal"
@@ -79,12 +80,16 @@ var (
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.")
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.")
enableVolumeGroupSnapshots = flag.Bool("enable-volume-group-snapshots", false, "Enables the volume group snapshot feature, allowing the user to create snapshots of groups of volumes.")
groupSnapshotNamePrefix = flag.String("groupsnapshot-name-prefix", "groupsnapshot", "Prefix to apply to the name of a created group snapshot")
groupSnapshotNameUUIDLength = flag.Int("groupsnapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created group snapshot. Defaults behavior is to NOT truncate.")
)
var (
@@ -222,6 +227,15 @@ func main() {
klog.V(2).Infof("Start NewCSISnapshotSideCarController with snapshotter [%s] kubeconfig [%s] csiTimeout [%+v] csiAddress [%s] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", driverName, *kubeconfig, *csiTimeout, *csiAddress, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength)
snapShotter := snapshotter.NewSnapshotter(csiConn)
var groupSnapshotter group_snapshotter.GroupSnapshotter
if *enableVolumeGroupSnapshots {
groupSnapshotter = group_snapshotter.NewGroupSnapshotter(csiConn)
if len(*groupSnapshotNamePrefix) == 0 {
klog.Error("group snapshot name prefix cannot be of length 0")
os.Exit(1)
}
}
ctrl := controller.NewCSISnapshotSideCarController(
snapClient,
kubeClient,
@@ -229,12 +243,19 @@ func main() {
snapshotContentfactory.Snapshot().V1().VolumeSnapshotContents(),
factory.Snapshot().V1().VolumeSnapshotClasses(),
snapShotter,
groupSnapshotter,
*csiTimeout,
*resyncPeriod,
*snapshotNamePrefix,
*snapshotNameUUIDLength,
*groupSnapshotNamePrefix,
*groupSnapshotNameUUIDLength,
*extraCreateMetadata,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
*enableVolumeGroupSnapshots,
snapshotContentfactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
snapshotContentfactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
)
run := func(context.Context) {

View File

@@ -72,6 +72,7 @@ var (
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
enableDistributedSnapshotting = flag.Bool("enable-distributed-snapshotting", false, "Enables each node to handle snapshotting for the local volumes created on that node")
preventVolumeModeConversion = flag.Bool("prevent-volume-mode-conversion", false, "Prevents an unauthorised user from modifying the volume mode when creating a PVC from an existing VolumeSnapshot.")
enableVolumeGroupSnapshots = flag.Bool("enable-volume-group-snapshots", false, "Enables the volume group snapshot feature, allowing the user to create snapshots of groups of volumes.")
retryCRDIntervalMax = flag.Duration("retry-crd-interval-max", 5*time.Second, "Maximum retry interval to wait for CRDs to appear. The default is 5 seconds.")
)
@@ -193,14 +194,20 @@ func main() {
factory.Snapshot().V1().VolumeSnapshots(),
factory.Snapshot().V1().VolumeSnapshotContents(),
factory.Snapshot().V1().VolumeSnapshotClasses(),
factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshots(),
factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
nodeInformer,
metricsManager,
*resyncPeriod,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
*enableDistributedSnapshotting,
*preventVolumeModeConversion,
*enableVolumeGroupSnapshots,
)
if err := ensureCustomResourceDefinitionsExist(snapClient); err != nil {

2
go.mod
View File

@@ -3,7 +3,7 @@ module github.com/kubernetes-csi/external-snapshotter/v6
go 1.20
require (
github.com/container-storage-interface/spec v1.7.0
github.com/container-storage-interface/spec v1.8.0
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/mock v1.6.0

4
go.sum
View File

@@ -60,8 +60,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/container-storage-interface/spec v1.3.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/container-storage-interface/spec v1.7.0 h1:gW8eyFQUZWWrMWa8p1seJ28gwDoN5CVJ4uAbQ+Hdycw=
github.com/container-storage-interface/spec v1.7.0/go.mod h1:JYuzLqr9VVNoDJl44xp/8fmCOvWPDKzuGTwCoklhuqk=
github.com/container-storage-interface/spec v1.8.0 h1:D0vhF3PLIZwlwZEf2eNbpujGCNwspwTYf2idJRJx4xI=
github.com/container-storage-interface/spec v1.8.0/go.mod h1:ROLik+GhPslwwWRNFF1KasPzroNARibH2rfz1rkg4H0=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -842,12 +842,18 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory.Snapshot().V1().VolumeSnapshots(),
informerFactory.Snapshot().V1().VolumeSnapshotContents(),
informerFactory.Snapshot().V1().VolumeSnapshotClasses(),
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshots(),
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
coreFactory.Core().V1().PersistentVolumeClaims(),
nil,
metricsManager,
60*time.Second,
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
false,
false,
false,
)

View File

@@ -0,0 +1,937 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common_controller
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
klog "k8s.io/klog/v2"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils"
)
func (ctrl *csiSnapshotCommonController) storeGroupSnapshotUpdate(groupsnapshot interface{}) (bool, error) {
return utils.StoreObjectUpdate(ctrl.groupSnapshotStore, groupsnapshot, "groupsnapshot")
}
func (ctrl *csiSnapshotCommonController) storeGroupSnapshotContentUpdate(groupsnapshotcontent interface{}) (bool, error) {
return utils.StoreObjectUpdate(ctrl.groupSnapshotContentStore, groupsnapshotcontent, "groupsnapshotcontent")
}
// getGroupSnapshotClass is a helper function to get group snapshot class from the group snapshot class name.
func (ctrl *csiSnapshotCommonController) getGroupSnapshotClass(className string) (*crdv1alpha1.VolumeGroupSnapshotClass, error) {
klog.V(5).Infof("getGroupSnapshotClass: VolumeGroupSnapshotClassName [%s]", className)
groupSnapshotClass, err := ctrl.groupSnapshotClassLister.Get(className)
if err != nil {
klog.Errorf("failed to retrieve group snapshot class %s from the informer: %q", className, err)
return nil, err
}
return groupSnapshotClass, nil
}
// updateGroupSnapshotErrorStatusWithEvent saves new groupsnapshot.Status to API
// server and emits given event on the group snapshot. It saves the status and
// emits the event only when the status has actually changed from the version
// saved in API server.
//
// Parameters:
//
// - groupSnapshot - group snapshot to update
// - setReadyToFalse bool - indicates whether to set the group snapshot's
// ReadyToUse status to false.
// if true, ReadyToUse will be set to false;
// otherwise, ReadyToUse will not be changed.
// - eventtype, reason, message - event to send, see EventRecorder.Event()
func (ctrl *csiSnapshotCommonController) updateGroupSnapshotErrorStatusWithEvent(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, setReadyToFalse bool, eventtype, reason, message string) error {
klog.V(5).Infof("updateGroupSnapshotErrorStatusWithEvent[%s]", utils.GroupSnapshotKey(groupSnapshot))
if groupSnapshot.Status != nil && groupSnapshot.Status.Error != nil && *groupSnapshot.Status.Error.Message == message {
klog.V(4).Infof("updateGroupSnapshotErrorStatusWithEvent[%s]: the same error %v is already set", groupSnapshot.Name, groupSnapshot.Status.Error)
return nil
}
groupSnapshotClone := groupSnapshot.DeepCopy()
if groupSnapshotClone.Status == nil {
groupSnapshotClone.Status = &crdv1alpha1.VolumeGroupSnapshotStatus{}
}
statusError := &crdv1.VolumeSnapshotError{
Time: &metav1.Time{
Time: time.Now(),
},
Message: &message,
}
groupSnapshotClone.Status.Error = statusError
// Only update ReadyToUse in VolumeGroupSnapshot's Status to false if setReadyToFalse is true.
if setReadyToFalse {
ready := false
groupSnapshotClone.Status.ReadyToUse = &ready
}
newSnapshot, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).UpdateStatus(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{})
// Emit the event even if the status update fails so that user can see the error
ctrl.eventRecorder.Event(newSnapshot, eventtype, reason, message)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshot[%s] error status failed %v", utils.GroupSnapshotKey(groupSnapshot), err)
return err
}
_, err = ctrl.storeGroupSnapshotUpdate(newSnapshot)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshot[%s] error status: cannot update internal cache %v", utils.GroupSnapshotKey(groupSnapshot), err)
return err
}
return nil
}
// SetDefaultGroupSnapshotClass is a helper function to figure out the default
// group snapshot class.
// For pre-provisioned case, it's an no-op.
// For dynamic provisioning, it gets the default GroupSnapshotClasses in the
// system if there is any (could be multiple), and finds the one with the same
// CSI Driver as a PV from which a group snapshot will be taken.
func (ctrl *csiSnapshotCommonController) SetDefaultGroupSnapshotClass(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshotClass, *crdv1alpha1.VolumeGroupSnapshot, error) {
klog.V(5).Infof("SetDefaultGroupSnapshotClass for group snapshot [%s]", groupSnapshot.Name)
if groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName != nil {
// don't return error for pre-provisioned group snapshots
klog.V(5).Infof("Don't need to find GroupSnapshotClass for pre-provisioned group snapshot [%s]", groupSnapshot.Name)
return nil, groupSnapshot, nil
}
// Find default group snapshot class if available
list, err := ctrl.groupSnapshotClassLister.List(labels.Everything())
if err != nil {
return nil, groupSnapshot, err
}
pvDriver, err := ctrl.pvDriverFromGroupSnapshot(groupSnapshot)
if err != nil {
klog.Errorf("failed to get pv csi driver from group snapshot %s/%s: %q", groupSnapshot.Namespace, groupSnapshot.Name, err)
return nil, groupSnapshot, err
}
defaultClasses := []*crdv1alpha1.VolumeGroupSnapshotClass{}
for _, groupSnapshotClass := range list {
if utils.IsDefaultAnnotation(groupSnapshotClass.ObjectMeta) && pvDriver == groupSnapshotClass.Driver {
defaultClasses = append(defaultClasses, groupSnapshotClass)
klog.V(5).Infof("get defaultGroupClass added: %s, driver: %s", groupSnapshotClass.Name, pvDriver)
}
}
if len(defaultClasses) == 0 {
return nil, groupSnapshot, fmt.Errorf("cannot find default group snapshot class")
}
if len(defaultClasses) > 1 {
klog.V(4).Infof("get DefaultGroupSnapshotClass %d defaults found", len(defaultClasses))
return nil, groupSnapshot, fmt.Errorf("%d default snapshot classes were found", len(defaultClasses))
}
klog.V(5).Infof("setDefaultGroupSnapshotClass [%s]: default VolumeGroupSnapshotClassName [%s]", groupSnapshot.Name, defaultClasses[0].Name)
groupSnapshotClone := groupSnapshot.DeepCopy()
groupSnapshotClone.Spec.VolumeGroupSnapshotClassName = &(defaultClasses[0].Name)
newGroupSnapshot, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).Update(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshot[%s] default group snapshot class failed %v", utils.GroupSnapshotKey(groupSnapshot), err)
}
_, updateErr := ctrl.storeGroupSnapshotUpdate(newGroupSnapshot)
if updateErr != nil {
// We will get a "group snapshot update" event soon, this is not a big error
klog.V(4).Infof("setDefaultSnapshotClass [%s]: cannot update internal cache: %v", utils.GroupSnapshotKey(groupSnapshot), updateErr)
}
return defaultClasses[0], newGroupSnapshot, nil
}
// pvDriverFromGroupSnapshot is a helper function to get the CSI driver name from the targeted persistent volume.
// It looks up every PVC from which the group snapshot is specified to be created from, and looks for the PVC's
// corresponding PV. Bi-directional binding will be verified between PVC and PV before the PV's CSI driver is returned.
// For an non-CSI volume, it returns an error immediately as it's not supported.
func (ctrl *csiSnapshotCommonController) pvDriverFromGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (string, error) {
pvs, err := ctrl.getVolumesFromVolumeGroupSnapshot(groupSnapshot)
if err != nil {
return "", err
}
// Take any volume to get the driver
if pvs[0].Spec.PersistentVolumeSource.CSI == nil {
return "", fmt.Errorf("snapshotting non-CSI volumes is not supported, group snapshot:%s/%s", groupSnapshot.Namespace, groupSnapshot.Name)
}
return pvs[0].Spec.PersistentVolumeSource.CSI.Driver, nil
}
// getVolumesFromVolumeGroupSnapshot returns the list of PersistentVolume from a VolumeGroupSnapshot.
func (ctrl *csiSnapshotCommonController) getVolumesFromVolumeGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) ([]*v1.PersistentVolume, error) {
var pvReturnList []*v1.PersistentVolume
pvcs, err := ctrl.getClaimsFromVolumeGroupSnapshot(groupSnapshot)
if err != nil {
return nil, err
}
for _, pvc := range pvcs {
if pvc.Status.Phase != v1.ClaimBound {
return nil, fmt.Errorf("the PVC %s is not yet bound to a PV, will not attempt to take a group snapshot", pvc.Name)
}
pvName := pvc.Spec.VolumeName
pv, err := ctrl.client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to retrieve PV %s from the API server: %q", pvName, err)
}
// Verify binding between PV/PVC is still valid
bound := ctrl.isVolumeBoundToClaim(pv, &pvc)
if bound == false {
klog.Warningf("binding between PV %s and PVC %s is broken", pvName, pvc.Name)
return nil, fmt.Errorf("claim in dataSource not bound or invalid")
}
pvReturnList = append(pvReturnList, pv)
klog.V(5).Infof("getVolumeFromVolumeGroupSnapshot: group snapshot [%s] PV name [%s]", groupSnapshot.Name, pvName)
}
return pvReturnList, nil
}
// getClaimsFromVolumeGroupSnapshot is a helper function to get a list of PVCs from VolumeGroupSnapshot.
func (ctrl *csiSnapshotCommonController) getClaimsFromVolumeGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) ([]v1.PersistentVolumeClaim, error) {
labelSelector := groupSnapshot.Spec.Source.Selector
// Get PVC that has group snapshot label applied.
pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(groupSnapshot.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
if err != nil {
return nil, fmt.Errorf("failed to list PVCs with label selector %s: %q", labelSelector.String(), err)
}
if len(pvcList.Items) == 0 {
return nil, fmt.Errorf("label selector %s for group snapshot not applied to any PVC", labelSelector.String())
}
return pvcList.Items, nil
}
// updateGroupSnapshot runs in worker thread and handles "groupsnapshot added",
// "groupsnapshot updated" and "periodic sync" events.
func (ctrl *csiSnapshotCommonController) updateGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) error {
// Store the new group snapshot version in the cache and do not process it
// if this is an old version.
klog.V(5).Infof("updateGroupSnapshot %q", utils.GroupSnapshotKey(groupSnapshot))
newGroupSnapshot, err := ctrl.storeGroupSnapshotUpdate(groupSnapshot)
if err != nil {
klog.Errorf("%v", err)
}
if !newGroupSnapshot {
return nil
}
err = ctrl.syncGroupSnapshot(groupSnapshot)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync group snapshot %q: %+v", utils.GroupSnapshotKey(groupSnapshot), err)
} else {
klog.Errorf("could not sync group snapshot %q: %+v", utils.GroupSnapshotKey(groupSnapshot), err)
}
return err
}
return nil
}
// deleteGroupSnapshot runs in worker thread and handles "groupsnapshot deleted" event.
func (ctrl *csiSnapshotCommonController) deleteGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) {
_ = ctrl.snapshotStore.Delete(groupSnapshot)
klog.V(4).Infof("group snapshot %q deleted", utils.GroupSnapshotKey(groupSnapshot))
groupSnapshotContentName := ""
if groupSnapshot.Status != nil && groupSnapshot.Status.BoundVolumeGroupSnapshotContentName != nil {
groupSnapshotContentName = *groupSnapshot.Status.BoundVolumeGroupSnapshotContentName
}
if groupSnapshotContentName == "" {
klog.V(5).Infof("deleteGroupSnapshot[%q]: group snapshot content not bound", utils.GroupSnapshotKey(groupSnapshot))
return
}
// sync the group snapshot content when its group snapshot is deleted. Explicitly sync'ing
// the group snapshot content here in response to group snapshot deletion prevents the group
// snapshot content from waiting until the next sync period for its release.
klog.V(5).Infof("deleteGroupSnapshot[%q]: scheduling sync of group snapshot content %s", utils.GroupSnapshotKey(groupSnapshot), groupSnapshotContentName)
ctrl.groupSnapshotContentQueue.Add(groupSnapshotContentName)
}
// syncGroupSnapshot is the main controller method to decide what to do with a
// group snapshot. It's invoked by appropriate cache.Controller callbacks when
// a group snapshot is created, updated or periodically synced. We do not
// differentiate between these events.
// For easier readability, it is split into syncUnreadyGroupSnapshot and syncReadyGroupSnapshot
func (ctrl *csiSnapshotCommonController) syncGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) error {
klog.V(5).Infof("synchronizing VolumeGroupSnapshot[%s]", utils.GroupSnapshotKey(groupSnapshot))
klog.V(5).Infof("syncGroupSnapshot [%s]: check if we should remove finalizer on group snapshot PVC source and remove it if we can", utils.GroupSnapshotKey(groupSnapshot))
/*
TODO:
- Check and remove finalizer if needed.
- Check and set invalid group snapshot label, if needed.
- Process if deletion timestamp is set.
- Check and add group snapshot finalizers.
*/
// Need to build or update groupSnapshot.Status in following cases:
// 1) groupSnapshot.Status is nil
// 2) groupSnapshot.Status.ReadyToUse is false
// 3) groupSnapshot.Status.IsBoundVolumeGroupSnapshotContentNameSet is not set
// 4) groupSnapshot.Status.IsVolumeSnapshotRefListSet is not set
if !utils.IsGroupSnapshotReady(groupSnapshot) || !utils.IsBoundVolumeGroupSnapshotContentNameSet(groupSnapshot) || !utils.IsVolumeSnapshotRefListSet(groupSnapshot) {
return ctrl.syncUnreadyGroupSnapshot(groupSnapshot)
}
return ctrl.syncReadyGroupSnapshot(groupSnapshot)
}
// syncReadyGroupSnapshot checks the group snapshot which has been bound to group
// snapshot content successfully before.
// If there is any problem with the binding (e.g., group snapshot points to a
// non-existent group snapshot content), update the group snapshot status and emit event.
func (ctrl *csiSnapshotCommonController) syncReadyGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) error {
if !utils.IsBoundVolumeGroupSnapshotContentNameSet(groupSnapshot) {
return fmt.Errorf("group snapshot %s is not bound to a group snapshot content", utils.GroupSnapshotKey(groupSnapshot))
}
groupSnapshotContent, err := ctrl.getGroupSnapshotContentFromStore(*groupSnapshot.Status.BoundVolumeGroupSnapshotContentName)
if err != nil {
return nil
}
if groupSnapshotContent == nil {
// this meant there is no matching group snapshot content in cache found
// update status of the group snapshot and return
return ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMissing", "VolumeGroupSnapshotContent is missing")
}
klog.V(5).Infof("syncReadyGroupSnapshot[%s]: VolumeGroupSnapshotContent %q found", utils.GroupSnapshotKey(groupSnapshot), groupSnapshotContent.Name)
// check binding from group snapshot content side to make sure the binding is still valid
if !utils.IsVolumeGroupSnapshotRefSet(groupSnapshot, groupSnapshotContent) {
// group snapshot is bound but group snapshot content is not pointing to the group snapshot
return ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotMisbound", "VolumeGroupSnapshotContent is not bound to the VolumeGroupSnapshot correctly")
}
// everything is verified, return
return nil
}
// getGroupSnapshotContentFromStore tries to find a VolumeGroupSnapshotContent from group
// snapshot content cache store by name.
// Note that if no VolumeGroupSnapshotContent exists in the cache store and no error
// encountered, it returns (nil, nil)
func (ctrl *csiSnapshotCommonController) getGroupSnapshotContentFromStore(contentName string) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
obj, exist, err := ctrl.groupSnapshotContentStore.GetByKey(contentName)
if err != nil {
// should never reach here based on implementation at:
// https://github.com/kubernetes/client-go/blob/master/tools/cache/store.go#L226
return nil, err
}
if !exist {
// not able to find a matching group snapshot content
return nil, nil
}
groupSnapshotContent, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent)
if !ok {
return nil, fmt.Errorf("expected VolumeGroupSnapshotContent, got %+v", obj)
}
return groupSnapshotContent, nil
}
// syncUnreadyGroupSnapshot is the main controller method to decide what to do
// with a group snapshot which is not set to ready.
func (ctrl *csiSnapshotCommonController) syncUnreadyGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) error {
uniqueGroupSnapshotName := utils.GroupSnapshotKey(groupSnapshot)
klog.V(5).Infof("syncUnreadyGroupSnapshot %s", uniqueGroupSnapshotName)
/*
TODO: Add metrics
*/
// Pre-provisioned snapshot
if groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName != nil {
groupSnapshotContent, err := ctrl.getPreprovisionedGroupSnapshotContentFromStore(groupSnapshot)
if err != nil {
return err
}
// if no group snapshot content found yet, update status and return
if groupSnapshotContent == nil {
// can not find the desired VolumeGroupSnapshotContent from cache store
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMissing", "VolumeGroupSnapshotContent is missing")
klog.V(4).Infof("syncUnreadyGroupSnapshot[%s]: group snapshot content %q requested but not found, will try again", utils.GroupSnapshotKey(groupSnapshot), *groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName)
return fmt.Errorf("group snapshot %s requests an non-existing group snapshot content %s", utils.GroupSnapshotKey(groupSnapshot), *groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName)
}
// Set VolumeGroupSnapshotRef UID
newGroupSnapshotContent, err := ctrl.checkAndBindGroupSnapshotContent(groupSnapshot, groupSnapshotContent)
if err != nil {
// group snapshot is bound but group snapshot content is not bound to group snapshot correctly
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotBindFailed", fmt.Sprintf("GroupSnapshot failed to bind VolumeGroupSnapshotContent, %v", err))
return fmt.Errorf("group snapshot %s is bound, but VolumeGroupSnapshotContent %s is not bound to the VolumeGroupSnapshot correctly, %v", uniqueGroupSnapshotName, groupSnapshotContent.Name, err)
}
// update group snapshot status
klog.V(5).Infof("syncUnreadyGroupSnapshot [%s]: trying to update group snapshot status", utils.GroupSnapshotKey(groupSnapshot))
if _, err = ctrl.updateGroupSnapshotStatus(groupSnapshot, newGroupSnapshotContent); err != nil {
// update group snapshot status failed
klog.V(4).Infof("failed to update group snapshot %s status: %v", utils.GroupSnapshotKey(groupSnapshot), err)
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "GroupSnapshotStatusUpdateFailed", fmt.Sprintf("GroupSnapshot status update failed, %v", err))
return err
}
return nil
}
// groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName == nil - dynamically created group snapshot
klog.V(5).Infof("getDynamicallyProvisionedGroupContentFromStore for snapshot %s", uniqueGroupSnapshotName)
contentObj, err := ctrl.getDynamicallyProvisionedGroupContentFromStore(groupSnapshot)
if err != nil {
klog.V(4).Infof("getDynamicallyProvisionedGroupContentFromStore[%s]: error when getting group snapshot content for group snapshot %v", uniqueGroupSnapshotName, err)
return err
}
if contentObj != nil {
klog.V(5).Infof("Found VolumeGroupSnapshotContent object %s for group snapshot %s", contentObj.Name, uniqueGroupSnapshotName)
if contentObj.Spec.Source.VolumeGroupSnapshotHandle != nil {
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotHandleSet", fmt.Sprintf("GroupSnapshot handle should not be set in group snapshot content %s for dynamic provisioning", uniqueGroupSnapshotName))
return fmt.Errorf("VolumeGroupSnapshotHandle should not be set in the group snapshot content for dynamic provisioning for group snapshot %s", uniqueGroupSnapshotName)
}
newGroupSnapshot, err := ctrl.bindandUpdateVolumeGroupSnapshot(contentObj, groupSnapshot)
if err != nil {
klog.V(4).Infof("bindandUpdateVolumeGroupSnapshot[%s]: failed to bind group snapshot content [%s] to group snapshot %v", uniqueGroupSnapshotName, contentObj.Name, err)
return err
}
klog.V(5).Infof("bindandUpdateVolumeGroupSnapshot %v", newGroupSnapshot)
return nil
}
// If we reach here, it is a dynamically provisioned group snapshot, and the VolumeGroupSnapshotContent object is not yet created.
var groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent
if groupSnapshotContent, err = ctrl.createGroupSnapshotContent(groupSnapshot); err != nil {
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentCreationFailed", fmt.Sprintf("failed to create group snapshot content with error %v", err))
return err
}
// Update group snapshot status with BoundVolumeGroupSnapshotContentName
klog.V(5).Infof("syncUnreadyGroupSnapshot [%s]: trying to update group snapshot status", utils.GroupSnapshotKey(groupSnapshot))
if _, err = ctrl.updateGroupSnapshotStatus(groupSnapshot, groupSnapshotContent); err != nil {
// update group snapshot status failed
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "GroupSnapshotStatusUpdateFailed", fmt.Sprintf("GroupSnapshot status update failed, %v", err))
return err
}
return nil
}
// getPreprovisionedGroupSnapshotContentFromStore tries to find a pre-provisioned
// volume group snapshot content object from group snapshot content cache store
// for the passed in VolumeGroupSnapshot.
// Note that this function assumes the passed in VolumeGroupSnapshot is a pre-provisioned
// one, i.e., groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName != nil.
// If no matching group snapshot content is found, it returns (nil, nil).
// If it found a group snapshot content which is not a pre-provisioned one, it
// updates the status of the group snapshot with an event and returns an error.
// If it found a group snapshot content which does not point to the passed in
// VolumeGroupSnapshot, it updates the status of the group snapshot with an event
// and returns an error.
// Otherwise, the found group snapshot content will be returned.
func (ctrl *csiSnapshotCommonController) getPreprovisionedGroupSnapshotContentFromStore(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
contentName := *groupSnapshot.Spec.Source.VolumeGroupSnapshotContentName
if contentName == "" {
return nil, fmt.Errorf("empty VolumeGroupSnapshotContentName for group snapshot %s", utils.GroupSnapshotKey(groupSnapshot))
}
groupSnapshotContent, err := ctrl.getGroupSnapshotContentFromStore(contentName)
if err != nil {
return nil, err
}
if groupSnapshotContent == nil {
// can not find the desired VolumeGroupSnapshotContent from cache store
return nil, nil
}
// check whether the content is a pre-provisioned VolumeGroupSnapshotContent
if groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle == nil {
// found a group snapshot content which represents a dynamically provisioned group snapshot
// update the group snapshot and return an error
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMismatch", "VolumeGroupSnapshotContent is dynamically provisioned while expecting a pre-provisioned one")
klog.V(4).Infof("sync group snapshot[%s]: group snapshot content %q is dynamically provisioned while expecting a pre-provisioned one", utils.GroupSnapshotKey(groupSnapshot), contentName)
return nil, fmt.Errorf("group snapshot %s expects a pre-provisioned VolumeGroupSnapshotContent %s but gets a dynamically provisioned one", utils.GroupSnapshotKey(groupSnapshot), contentName)
}
// verify the group snapshot content points back to the group snapshot
ref := groupSnapshotContent.Spec.VolumeGroupSnapshotRef
if ref.Name != groupSnapshot.Name || ref.Namespace != groupSnapshot.Namespace || (ref.UID != "" && ref.UID != groupSnapshot.UID) {
klog.V(4).Infof("sync group snapshot[%s]: VolumeGroupSnapshotContent %s is bound to another group snapshot %v", utils.GroupSnapshotKey(groupSnapshot), contentName, ref)
msg := fmt.Sprintf("VolumeGroupSnapshotContent [%s] is bound to a different group snapshot", contentName)
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMisbound", msg)
return nil, fmt.Errorf(msg)
}
return groupSnapshotContent, nil
}
// checkandBindGroupSnapshotContent checks whether the VolumeGroupSnapshotRef in
// the group snapshot content matches the given group snapshot. If match, it binds
// the group snapshot content with the group snapshot. This is for static binding where
// user has specified group snapshot name but not UID of the group snapshot in
// groupSnapshotContent.Spec.VolumeGroupSnapshotRef.
func (ctrl *csiSnapshotCommonController) checkAndBindGroupSnapshotContent(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
if groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Name != groupSnapshot.Name {
return nil, fmt.Errorf("Could not bind group snapshot %s and group snapshot content %s, the VolumeGroupSnapshotRef does not match", groupSnapshot.Name, groupSnapshotContent.Name)
} else if groupSnapshotContent.Spec.VolumeGroupSnapshotRef.UID != "" && groupSnapshotContent.Spec.VolumeGroupSnapshotRef.UID != groupSnapshot.UID {
return nil, fmt.Errorf("Could not bind group snapshot %s and group snapshot content %s, the VolumeGroupSnapshotRef does not match", groupSnapshot.Name, groupSnapshotContent.Name)
} else if groupSnapshotContent.Spec.VolumeGroupSnapshotRef.UID != "" && groupSnapshotContent.Spec.VolumeGroupSnapshotClassName != nil {
return groupSnapshotContent, nil
}
patches := []utils.PatchOp{
{
Op: "replace",
Path: "/spec/volumeGroupSnapshotRef/uid",
Value: string(groupSnapshot.UID),
},
}
if groupSnapshot.Spec.VolumeGroupSnapshotClassName != nil {
className := *(groupSnapshot.Spec.VolumeGroupSnapshotClassName)
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/spec/volumeGroupSnapshotClassName",
Value: className,
})
}
newContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContent, patches, ctrl.clientset)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status failed %v", groupSnapshotContent.Name, err)
return groupSnapshotContent, err
}
_, err = ctrl.storeGroupSnapshotContentUpdate(newContent)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status: cannot update internal cache %v", newContent.Name, err)
return newContent, err
}
return newContent, nil
}
// updateGroupSnapshotStatus updates group snapshot status based on group snapshot content status
func (ctrl *csiSnapshotCommonController) updateGroupSnapshotStatus(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshot, error) {
klog.V(5).Infof("updateGroupSnapshotStatus[%s]", utils.GroupSnapshotKey(groupSnapshot))
boundContentName := groupSnapshotContent.Name
var createdAt *time.Time
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.CreationTime != nil {
unixTime := time.Unix(0, *groupSnapshotContent.Status.CreationTime)
createdAt = &unixTime
}
var readyToUse bool
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.ReadyToUse != nil {
readyToUse = *groupSnapshotContent.Status.ReadyToUse
}
var volumeSnapshotErr *crdv1.VolumeSnapshotError
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.Error != nil {
volumeSnapshotErr = groupSnapshotContent.Status.Error.DeepCopy()
}
var volumeSnapshotRefList []v1.ObjectReference
if groupSnapshotContent.Status != nil && len(groupSnapshotContent.Status.VolumeSnapshotContentRefList) != 0 {
for _, contentRef := range groupSnapshotContent.Status.VolumeSnapshotContentRefList {
groupSnapshotContent, err := ctrl.contentLister.Get(contentRef.Name)
if err != nil {
return nil, fmt.Errorf("failed to get group snapshot content %s from group snapshot content store: %v", contentRef.Name, err)
}
volumeSnapshotRefList = append(volumeSnapshotRefList, groupSnapshotContent.Spec.VolumeSnapshotRef)
}
}
klog.V(5).Infof("updateGroupSnapshotStatus: updating VolumeGroupSnapshot [%+v] based on VolumeGroupSnapshotContentStatus [%+v]", groupSnapshot, groupSnapshotContent.Status)
groupSnapshotObj, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshot.Namespace).Get(context.TODO(), groupSnapshot.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get group snapshot %s from api server: %v", utils.GroupSnapshotKey(groupSnapshot), err)
}
var newStatus *crdv1alpha1.VolumeGroupSnapshotStatus
updated := false
if groupSnapshotObj.Status == nil {
newStatus = &crdv1alpha1.VolumeGroupSnapshotStatus{
BoundVolumeGroupSnapshotContentName: &boundContentName,
ReadyToUse: &readyToUse,
}
if createdAt != nil {
newStatus.CreationTime = &metav1.Time{Time: *createdAt}
}
if volumeSnapshotErr != nil {
newStatus.Error = volumeSnapshotErr
}
if len(volumeSnapshotRefList) == 0 {
newStatus.VolumeSnapshotRefList = volumeSnapshotRefList
}
updated = true
} else {
newStatus = groupSnapshotObj.Status.DeepCopy()
if newStatus.BoundVolumeGroupSnapshotContentName == nil {
newStatus.BoundVolumeGroupSnapshotContentName = &boundContentName
updated = true
}
if newStatus.CreationTime == nil && createdAt != nil {
newStatus.CreationTime = &metav1.Time{Time: *createdAt}
updated = true
}
if newStatus.ReadyToUse == nil || *newStatus.ReadyToUse != readyToUse {
newStatus.ReadyToUse = &readyToUse
updated = true
if readyToUse && newStatus.Error != nil {
newStatus.Error = nil
}
}
if (newStatus.Error == nil && volumeSnapshotErr != nil) || (newStatus.Error != nil && volumeSnapshotErr != nil && newStatus.Error.Time != nil && volumeSnapshotErr.Time != nil && &newStatus.Error.Time != &volumeSnapshotErr.Time) || (newStatus.Error != nil && volumeSnapshotErr == nil) {
newStatus.Error = volumeSnapshotErr
updated = true
}
if len(newStatus.VolumeSnapshotRefList) == 0 {
newStatus.VolumeSnapshotRefList = volumeSnapshotRefList
updated = true
}
}
if updated {
groupSnapshotClone := groupSnapshotObj.DeepCopy()
groupSnapshotClone.Status = newStatus
// Must meet the following criteria to emit a successful CreateGroupSnapshot status
// 1. Previous status was nil OR Previous status had a nil CreationTime
// 2. New status must be non-nil with a non-nil CreationTime
if !utils.IsGroupSnapshotCreated(groupSnapshotObj) && utils.IsGroupSnapshotCreated(groupSnapshotClone) {
msg := fmt.Sprintf("GroupSnapshot %s was successfully created by the CSI driver.", utils.GroupSnapshotKey(groupSnapshot))
ctrl.eventRecorder.Event(groupSnapshot, v1.EventTypeNormal, "GroupSnapshotCreated", msg)
}
// Must meet the following criteria to emit a successful CreateGroupSnapshotAndReady status
// 1. Previous status was nil OR Previous status had a nil ReadyToUse OR Previous status had a false ReadyToUse
// 2. New status must be non-nil with a ReadyToUse as true
if !utils.IsGroupSnapshotReady(groupSnapshotObj) && utils.IsGroupSnapshotReady(groupSnapshotClone) {
msg := fmt.Sprintf("GroupSnapshot %s is ready to use.", utils.GroupSnapshotKey(groupSnapshot))
ctrl.eventRecorder.Event(groupSnapshot, v1.EventTypeNormal, "GroupSnapshotReady", msg)
}
newGroupSnapshotObj, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshotClone.Namespace).UpdateStatus(context.TODO(), groupSnapshotClone, metav1.UpdateOptions{})
if err != nil {
return nil, newControllerUpdateError(utils.GroupSnapshotKey(groupSnapshot), err.Error())
}
return newGroupSnapshotObj, nil
}
return groupSnapshotObj, nil
}
// getDynamicallyProvisionedGroupContentFromStore tries to find a dynamically created
// group snapshot content object for the passed in VolumeGroupSnapshot from the
// group snapshot content store.
// Note that this function assumes the passed in VolumeGroupSnapshot is a dynamic
// one which requests creating a group snapshot from a group of PVCs.
// If no matching VolumeGroupSnapshotContent exists in the group snapshot content
// cache store, it returns (nil, nil)
// If a group snapshot content is found but it's not dynamically provisioned,
// the passed in group snapshot status will be updated with an error along with
// an event, and an error will be returned.
// If a group snapshot content is found but it does not point to the passed in VolumeGroupSnapshot,
// the passed in group snapshot will be updated with an error along with an event,
// and an error will be returned.
func (ctrl *csiSnapshotCommonController) getDynamicallyProvisionedGroupContentFromStore(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
contentName := utils.GetDynamicSnapshotContentNameForGroupSnapshot(groupSnapshot)
groupSnapshotContent, err := ctrl.getGroupSnapshotContentFromStore(contentName)
if err != nil {
return nil, err
}
if groupSnapshotContent == nil {
// no matching group snapshot content with the desired name has been found in cache
return nil, nil
}
// check whether the group snapshot content represents a dynamically provisioned snapshot
if groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle != nil {
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMismatch", "VolumeGroupSnapshotContent "+contentName+" is pre-provisioned while expecting a dynamically provisioned one")
klog.V(4).Infof("sync group snapshot[%s]: group snapshot content %s is pre-provisioned while expecting a dynamically provisioned one", utils.GroupSnapshotKey(groupSnapshot), contentName)
return nil, fmt.Errorf("group snapshot %s expects a dynamically provisioned VolumeGroupSnapshotContent %s but gets a pre-provisioned one", utils.GroupSnapshotKey(groupSnapshot), contentName)
}
// check whether the group snapshot content points back to the passed in VolumeGroupSnapshot
ref := groupSnapshotContent.Spec.VolumeGroupSnapshotRef
// Unlike a pre-provisioned group snapshot content, whose Spec.VolumeGroupSnapshotRef.UID will be
// left to be empty to allow binding to a group snapshot, a dynamically provisioned
// group snapshot content MUST have its Spec.VolumeGroupSnapshotRef.UID set to the group snapshot's
// UID from which it's been created, thus ref.UID == "" is not a legit case here.
if ref.Name != groupSnapshot.Name || ref.Namespace != groupSnapshot.Namespace || ref.UID != groupSnapshot.UID {
klog.V(4).Infof("sync group snapshot[%s]: VolumeGroupSnapshotContent %s is bound to another group snapshot %v", utils.GroupSnapshotKey(groupSnapshot), contentName, ref)
msg := fmt.Sprintf("VolumeGroupSnapshotContent [%s] is bound to a different group snapshot", contentName)
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMisbound", msg)
return nil, fmt.Errorf(msg)
}
return groupSnapshotContent, nil
}
// This routine sets snapshot.Spec.Source.VolumeGroupSnapshotContentName
func (ctrl *csiSnapshotCommonController) bindandUpdateVolumeGroupSnapshot(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent, groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshot, error) {
klog.V(5).Infof("bindandUpdateVolumeGroupSnapshot for group snapshot [%s]: groupSnapshotContent [%s]", groupSnapshot.Name, groupSnapshotContent.Name)
groupSnapshotObj, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshots(groupSnapshot.Namespace).Get(context.TODO(), groupSnapshot.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get group snapshot %s from api server: %v", utils.GroupSnapshotKey(groupSnapshot), err)
}
// Copy the group snapshot object before updating it
groupSnapshotCopy := groupSnapshotObj.DeepCopy()
// update group snapshot status
var updateGroupSnapshot *crdv1alpha1.VolumeGroupSnapshot
klog.V(5).Infof("bindandUpdateVolumeGroupSnapshot [%s]: trying to update group snapshot status", utils.GroupSnapshotKey(groupSnapshotCopy))
updateGroupSnapshot, err = ctrl.updateGroupSnapshotStatus(groupSnapshotCopy, groupSnapshotContent)
if err == nil {
groupSnapshotCopy = updateGroupSnapshot
}
if err != nil {
// update group snapshot status failed
klog.V(4).Infof("failed to update group snapshot %s status: %v", utils.GroupSnapshotKey(groupSnapshot), err)
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshotCopy, true, v1.EventTypeWarning, "GroupSnapshotStatusUpdateFailed", fmt.Sprintf("GroupSnapshot status update failed, %v", err))
return nil, err
}
_, err = ctrl.storeGroupSnapshotUpdate(groupSnapshotCopy)
if err != nil {
klog.Errorf("%v", err)
}
klog.V(5).Infof("bindandUpdateVolumeGroupSnapshot for group snapshot completed [%#v]", groupSnapshotCopy)
return groupSnapshotCopy, nil
}
// createGroupSnapshotContent will only be called for dynamic provisioning
func (ctrl *csiSnapshotCommonController) createGroupSnapshotContent(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.Infof("createSnapshotContent: Creating group snapshot content for groupn snapshot %s through the plugin ...", utils.GroupSnapshotKey(groupSnapshot))
/*
TODO: Add finalizer to group snapshot
*/
groupSnapshotClass, volumes, contentName, err := ctrl.getCreateGroupSnapshotInput(groupSnapshot)
if err != nil {
return nil, fmt.Errorf("failed to get input parameters to create group snapshot %s: %q", groupSnapshot.Name, err)
}
snapshotRef, err := ref.GetReference(scheme.Scheme, groupSnapshot)
if err != nil {
return nil, err
}
var pvNames []string
for _, pv := range volumes {
pvNames = append(pvNames, pv.Name)
}
groupSnapshotContent := &crdv1alpha1.VolumeGroupSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: contentName,
},
Spec: crdv1alpha1.VolumeGroupSnapshotContentSpec{
VolumeGroupSnapshotRef: *snapshotRef,
Source: crdv1alpha1.VolumeGroupSnapshotContentSource{
PersistentVolumeNames: pvNames,
},
VolumeGroupSnapshotClassName: &(groupSnapshotClass.Name),
DeletionPolicy: groupSnapshotClass.DeletionPolicy,
Driver: groupSnapshotClass.Driver,
},
}
/*
TODO: Add secret reference details
*/
var updateGroupSnapshoyContent *crdv1alpha1.VolumeGroupSnapshotContent
klog.V(5).Infof("volume group snapshot content %#v", groupSnapshotContent)
// Try to create the VolumeGroupSnapshotContent object
klog.V(5).Infof("createGroupSnapshotContent [%s]: trying to save volume group snapshot content %s", utils.GroupSnapshotKey(groupSnapshot), groupSnapshotContent.Name)
if updateGroupSnapshoyContent, err = ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Create(context.TODO(), groupSnapshotContent, metav1.CreateOptions{}); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
klog.V(3).Infof("volume group snapshot content %q for group snapshot %q already exists, reusing", groupSnapshotContent.Name, utils.GroupSnapshotKey(groupSnapshot))
err = nil
updateGroupSnapshoyContent = groupSnapshotContent
} else {
klog.V(3).Infof("volume group snapshot content %q for group snapshot %q saved, %v", groupSnapshotContent.Name, utils.GroupSnapshotKey(groupSnapshot), groupSnapshotContent)
}
}
if err != nil {
strerr := fmt.Sprintf("Error creating volume group snapshot content object for group snapshot %s: %v.", utils.GroupSnapshotKey(groupSnapshot), err)
klog.Error(strerr)
ctrl.eventRecorder.Event(groupSnapshot, v1.EventTypeWarning, "CreateGroupSnapshotContentFailed", strerr)
return nil, newControllerUpdateError(utils.GroupSnapshotKey(groupSnapshot), err.Error())
}
msg := fmt.Sprintf("Waiting for a group snapshot %s to be created by the CSI driver.", utils.GroupSnapshotKey(groupSnapshot))
ctrl.eventRecorder.Event(groupSnapshot, v1.EventTypeNormal, "CreatingGroupSnapshot", msg)
// Update group snapshot content in the cache store
_, err = ctrl.storeGroupSnapshotContentUpdate(updateGroupSnapshoyContent)
if err != nil {
klog.Errorf("failed to update group snapshot content store %v", err)
}
return updateGroupSnapshoyContent, nil
}
func (ctrl *csiSnapshotCommonController) getCreateGroupSnapshotInput(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshotClass, []*v1.PersistentVolume, string, error) {
className := groupSnapshot.Spec.VolumeGroupSnapshotClassName
klog.V(5).Infof("getCreateGroupSnapshotInput [%s]", groupSnapshot.Name)
var groupSnapshotClass *crdv1alpha1.VolumeGroupSnapshotClass
var err error
if className != nil {
groupSnapshotClass, err = ctrl.getGroupSnapshotClass(*className)
if err != nil {
klog.Errorf("getCreateGroupSnapshotInput failed to getClassFromVolumeGroupSnapshot %s", err)
return nil, nil, "", err
}
} else {
klog.Errorf("failed to getCreateGroupSnapshotInput %s without a group snapshot class", groupSnapshot.Name)
return nil, nil, "", fmt.Errorf("failed to take group snapshot %s without a group snapshot class", groupSnapshot.Name)
}
volumes, err := ctrl.getVolumesFromVolumeGroupSnapshot(groupSnapshot)
if err != nil {
klog.Errorf("getCreateGroupSnapshotInput failed to get PersistentVolume objects [%s]: Error: [%#v]", groupSnapshot.Name, err)
return nil, nil, "", err
}
// Create VolumeGroupSnapshotContent name
contentName := utils.GetDynamicSnapshotContentNameForGroupSnapshot(groupSnapshot)
return groupSnapshotClass, volumes, contentName, nil
}
// syncGroupSnapshotContent deals with one key off the queue
func (ctrl *csiSnapshotCommonController) syncGroupSnapshotContent(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
groupSnapshotName := utils.GroupSnapshotRefKey(&groupSnapshotContent.Spec.VolumeGroupSnapshotRef)
klog.V(4).Infof("synchronizing VolumeGroupSnapshotContent[%s]: group snapshot content is bound to group snapshot %s", groupSnapshotContent.Name, groupSnapshotName)
klog.V(5).Infof("syncGroupSnapshotContent[%s]: check if we should add invalid label on group snapshot content", groupSnapshotContent.Name)
// Keep this check in the controller since the validation webhook may not have been deployed.
if (groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle == nil && len(groupSnapshotContent.Spec.Source.PersistentVolumeNames) == 0) ||
(groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle != nil && len(groupSnapshotContent.Spec.Source.PersistentVolumeNames) > 0) {
err := fmt.Errorf("Exactly one of VolumeGroupSnapshotHandle and PersistentVolumeNames should be specified")
klog.Errorf("syncGroupSnapshotContent[%s]: validation error, %s", groupSnapshotContent.Name, err.Error())
ctrl.eventRecorder.Event(groupSnapshotContent, v1.EventTypeWarning, "GroupContentValidationError", err.Error())
return err
}
// The VolumeGroupSnapshotContent is reserved for a VolumeGroupSnapshot;
// that VolumeGroupSnapshot has not yet been bound to this VolumeGroupSnapshotContent;
// syncGroupSnapshot will handle it.
if groupSnapshotContent.Spec.VolumeGroupSnapshotRef.UID == "" {
klog.V(4).Infof("syncGroupSnapshotContent [%s]: VolumeGroupSnapshotContent is pre-bound to VolumeGroupSnapshot %s", groupSnapshotContent.Name, groupSnapshotName)
return nil
}
/*
TODO: Add finalizer to prevent deletion
*/
// Check if group snapshot exists in cache store
// If getGroupSnapshotFromStore returns (nil, nil), it means group snapshot not found
// and it may have already been deleted, and it will fall into the
// group snapshot == nil case below
var groupSnapshot *crdv1alpha1.VolumeGroupSnapshot
groupSnapshot, err := ctrl.getGroupSnapshotFromStore(groupSnapshotName)
if err != nil {
return err
}
if groupSnapshot != nil && groupSnapshot.UID != groupSnapshotContent.Spec.VolumeGroupSnapshotRef.UID {
// The group snapshot that the group snapshot content was pointing to was deleted, and another
// with the same name created.
klog.V(4).Infof("syncGroupSnapshotContent [%s]: group snapshot %s has different UID, the old one must have been deleted", groupSnapshotContent.Name, groupSnapshotName)
// Treat the group snapshot content as bound to a missing snapshot.
groupSnapshot = nil
} else {
// Check if groupSnapshot.Status is different from groupSnapshotContent.Status
// and add group snapshot to queue if there is a difference and it is worth
// triggering a group snapshot status update.
if groupSnapshot != nil && ctrl.needsUpdateGroupSnapshotStatus(groupSnapshot, groupSnapshotContent) {
klog.V(4).Infof("synchronizing VolumeGroupSnapshotContent for group snapshot [%s]: update group snapshot status to true if needed.", groupSnapshotName)
// Manually trigger a group snapshot status update to happen
// right away so that it is in-sync with the group snapshot content status
ctrl.groupSnapshotQueue.Add(groupSnapshotName)
}
}
return nil
}
// getGroupSnapshotFromStore finds group snapshot from the cache store.
// If getGroupSnapshotFromStore returns (nil, nil), it means group snapshot not
// found and it may have already been deleted.
func (ctrl *csiSnapshotCommonController) getGroupSnapshotFromStore(groupSnapshotName string) (*crdv1alpha1.VolumeGroupSnapshot, error) {
// Get the VolumeGroupSnapshot by _name_
var groupSnapshot *crdv1alpha1.VolumeGroupSnapshot
obj, found, err := ctrl.groupSnapshotStore.GetByKey(groupSnapshotName)
if err != nil {
return nil, err
}
if !found {
klog.V(4).Infof("getGroupSnapshotFromStore: group snapshot %s not found", groupSnapshotName)
// Fall through with group snapshot = nil
return nil, nil
}
var ok bool
groupSnapshot, ok = obj.(*crdv1alpha1.VolumeGroupSnapshot)
if !ok {
return nil, fmt.Errorf("cannot convert object from group snapshot cache to group snapshot %q!?: %#v", groupSnapshotName, obj)
}
klog.V(4).Infof("getGroupSnapshotFromStore: group snapshot %s found", groupSnapshotName)
return groupSnapshot, nil
}
// needsUpdateGroupSnapshotStatus compares group snapshot status with the group snapshot content
// status and decide if group snapshot status needs to be updated based on group snapshot content
// status
func (ctrl *csiSnapshotCommonController) needsUpdateGroupSnapshotStatus(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) bool {
klog.V(5).Infof("needsUpdateGroupSnapshotStatus[%s]", utils.GroupSnapshotKey(groupSnapshot))
if groupSnapshot.Status == nil && groupSnapshotContent.Status != nil {
return true
}
if groupSnapshotContent.Status == nil {
return false
}
if groupSnapshot.Status.BoundVolumeGroupSnapshotContentName == nil {
return true
}
if groupSnapshot.Status.CreationTime == nil && groupSnapshotContent.Status.CreationTime != nil {
return true
}
if groupSnapshot.Status.ReadyToUse == nil && groupSnapshotContent.Status.ReadyToUse != nil {
return true
}
if groupSnapshot.Status.ReadyToUse != nil && groupSnapshotContent.Status.ReadyToUse != nil && groupSnapshot.Status.ReadyToUse != groupSnapshotContent.Status.ReadyToUse {
return true
}
return false
}

View File

@@ -1165,6 +1165,7 @@ func (ctrl *csiSnapshotCommonController) updateSnapshotStatus(snapshot *crdv1.Vo
updated = true
} else {
newStatus = snapshotObj.Status.DeepCopy()
klog.Infof("Raunak 1 %s", newStatus.VolumeGroupSnapshotName)
if newStatus.BoundVolumeSnapshotContentName == nil {
newStatus.BoundVolumeSnapshotContentName = &boundContentName
updated = true

View File

@@ -20,10 +20,13 @@ import (
"fmt"
"time"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned"
storageinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1"
storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1"
groupsnapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumegroupsnapshot/v1alpha1"
snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1"
groupsnapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumegroupsnapshot/v1alpha1"
snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/metrics"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils"
@@ -43,25 +46,35 @@ import (
)
type csiSnapshotCommonController struct {
clientset clientset.Interface
client kubernetes.Interface
eventRecorder record.EventRecorder
snapshotQueue workqueue.RateLimitingInterface
contentQueue workqueue.RateLimitingInterface
clientset clientset.Interface
client kubernetes.Interface
eventRecorder record.EventRecorder
snapshotQueue workqueue.RateLimitingInterface
contentQueue workqueue.RateLimitingInterface
groupSnapshotQueue workqueue.RateLimitingInterface
groupSnapshotContentQueue workqueue.RateLimitingInterface
snapshotLister storagelisters.VolumeSnapshotLister
snapshotListerSynced cache.InformerSynced
contentLister storagelisters.VolumeSnapshotContentLister
contentListerSynced cache.InformerSynced
classLister storagelisters.VolumeSnapshotClassLister
classListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
snapshotLister snapshotlisters.VolumeSnapshotLister
snapshotListerSynced cache.InformerSynced
contentLister snapshotlisters.VolumeSnapshotContentLister
contentListerSynced cache.InformerSynced
classLister snapshotlisters.VolumeSnapshotClassLister
classListerSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
groupSnapshotLister groupsnapshotlisters.VolumeGroupSnapshotLister
groupSnapshotListerSynced cache.InformerSynced
groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister
groupSnapshotContentListerSynced cache.InformerSynced
groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister
groupSnapshotClassListerSynced cache.InformerSynced
snapshotStore cache.Store
contentStore cache.Store
snapshotStore cache.Store
contentStore cache.Store
groupSnapshotStore cache.Store
groupSnapshotContentStore cache.Store
metricsManager metrics.MetricsManager
@@ -69,23 +82,30 @@ type csiSnapshotCommonController struct {
enableDistributedSnapshotting bool
preventVolumeModeConversion bool
enableVolumeGroupSnapshots bool
}
// NewCSISnapshotController returns a new *csiSnapshotCommonController
func NewCSISnapshotCommonController(
clientset clientset.Interface,
client kubernetes.Interface,
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
volumeSnapshotInformer snapshotinformers.VolumeSnapshotInformer,
volumeSnapshotContentInformer snapshotinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer snapshotinformers.VolumeSnapshotClassInformer,
volumeGroupSnapshotInformer groupsnapshotinformers.VolumeGroupSnapshotInformer,
volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer,
volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
nodeInformer coreinformers.NodeInformer,
metricsManager metrics.MetricsManager,
resyncPeriod time.Duration,
snapshotRateLimiter workqueue.RateLimiter,
contentRateLimiter workqueue.RateLimiter,
groupSnapshotRateLimiter workqueue.RateLimiter,
groupSnapshotContentRateLimiter workqueue.RateLimiter,
enableDistributedSnapshotting bool,
preventVolumeModeConversion bool,
enableVolumeGroupSnapshots bool,
) *csiSnapshotCommonController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
@@ -142,12 +162,52 @@ func NewCSISnapshotCommonController(
ctrl.preventVolumeModeConversion = preventVolumeModeConversion
ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots
if enableVolumeGroupSnapshots {
ctrl.groupSnapshotStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ctrl.groupSnapshotQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotRateLimiter, "snapshot-controller-group-snapshot")
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "snapshot-controller-group-content")
volumeGroupSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotWork(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueGroupSnapshotWork(newObj) },
DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotWork(obj) },
},
ctrl.resyncPeriod,
)
ctrl.groupSnapshotLister = volumeGroupSnapshotInformer.Lister()
ctrl.groupSnapshotListerSynced = volumeGroupSnapshotInformer.Informer().HasSynced
volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueGroupSnapshotContentWork(newObj) },
DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) },
},
ctrl.resyncPeriod,
)
ctrl.groupSnapshotContentLister = volumeGroupSnapshotContentInformer.Lister()
ctrl.groupSnapshotContentListerSynced = volumeGroupSnapshotContentInformer.Informer().HasSynced
ctrl.groupSnapshotClassLister = volumeGroupSnapshotClassInformer.Lister()
ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotClassInformer.Informer().HasSynced
}
return ctrl
}
func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}) {
defer ctrl.snapshotQueue.ShutDown()
defer ctrl.contentQueue.ShutDown()
if ctrl.enableVolumeGroupSnapshots {
defer ctrl.groupSnapshotQueue.ShutDown()
defer ctrl.groupSnapshotContentQueue.ShutDown()
}
klog.Infof("Starting snapshot controller")
defer klog.Infof("Shutting snapshot controller")
@@ -156,17 +216,24 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
if ctrl.enableDistributedSnapshotting {
informersSynced = append(informersSynced, ctrl.nodeListerSynced)
}
if ctrl.enableVolumeGroupSnapshots {
informersSynced = append(informersSynced, []cache.InformerSynced{ctrl.groupSnapshotListerSynced, ctrl.groupSnapshotContentListerSynced, ctrl.groupSnapshotClassListerSynced}...)
}
if !cache.WaitForCacheSync(stopCh, informersSynced...) {
klog.Errorf("Cannot sync caches")
return
}
ctrl.initializeCaches(ctrl.snapshotLister, ctrl.contentLister)
ctrl.initializeCaches()
for i := 0; i < workers; i++ {
go wait.Until(ctrl.snapshotWorker, 0, stopCh)
go wait.Until(ctrl.contentWorker, 0, stopCh)
if ctrl.enableVolumeGroupSnapshots {
go wait.Until(ctrl.groupSnapshotWorker, 0, stopCh)
go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
}
}
<-stopCh
@@ -481,8 +548,8 @@ func (ctrl *csiSnapshotCommonController) deleteContent(content *crdv1.VolumeSnap
// initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addSnapshot/addContent to
// perform initial synchronization of the controller.
func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storagelisters.VolumeSnapshotLister, contentLister storagelisters.VolumeSnapshotContentLister) {
snapshotList, err := snapshotLister.List(labels.Everything())
func (ctrl *csiSnapshotCommonController) initializeCaches() {
snapshotList, err := ctrl.snapshotLister.List(labels.Everything())
if err != nil {
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
@@ -494,7 +561,7 @@ func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storage
}
}
contentList, err := contentLister.List(labels.Everything())
contentList, err := ctrl.contentLister.List(labels.Everything())
if err != nil {
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
@@ -506,5 +573,280 @@ func (ctrl *csiSnapshotCommonController) initializeCaches(snapshotLister storage
}
}
if ctrl.enableVolumeGroupSnapshots {
groupSnapshotList, err := ctrl.snapshotLister.List(labels.Everything())
if err != nil {
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, groupSnapshot := range groupSnapshotList {
groupSnapshotClone := groupSnapshot.DeepCopy()
if _, err = ctrl.storeGroupSnapshotUpdate(groupSnapshotClone); err != nil {
klog.Errorf("error updating volume group snapshot cache: %v", err)
}
}
groupContentList, err := ctrl.contentLister.List(labels.Everything())
if err != nil {
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, groupContent := range groupContentList {
groupContentClone := groupContent.DeepCopy()
if _, err = ctrl.storeGroupSnapshotContentUpdate(groupContentClone); err != nil {
klog.Errorf("error updating volume group snapshot content cache: %v", err)
}
}
}
klog.V(4).Infof("controller initialized")
}
// enqueueGroupSnapshotWork adds group snapshot to given work queue.
func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if groupSnapshot, ok := obj.(*crdv1alpha1.VolumeGroupSnapshot); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(groupSnapshot)
if err != nil {
klog.Errorf("failed to get key from object: %v, %v", err, groupSnapshot)
return
}
klog.V(5).Infof("enqueued %q for sync", objName)
ctrl.groupSnapshotQueue.Add(objName)
}
}
// enqueueGroupSnapshotContentWork adds group snapshot content to given work queue.
func (ctrl *csiSnapshotCommonController) enqueueGroupSnapshotContentWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if content, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(content)
if err != nil {
klog.Errorf("failed to get key from object: %v, %v", err, content)
return
}
klog.V(5).Infof("enqueued %q for sync", objName)
ctrl.groupSnapshotContentQueue.Add(objName)
}
}
// groupSnapshotWorker is the main worker for VolumeGroupSnapshots.
func (ctrl *csiSnapshotCommonController) groupSnapshotWorker() {
keyObj, quit := ctrl.groupSnapshotQueue.Get()
if quit {
return
}
defer ctrl.groupSnapshotQueue.Done(keyObj)
if err := ctrl.syncGroupSnapshotByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.groupSnapshotQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync group snapshot %q, will retry again: %v", keyObj.(string), err)
} else {
// Finally, if no error occurs we forget this item so it does not
// get queued again until another change happens.
ctrl.groupSnapshotQueue.Forget(keyObj)
}
}
// groupSnapshotContentWorker is the main worker for VolumeGroupSnapshotContent.
func (ctrl *csiSnapshotCommonController) groupSnapshotContentWorker() {
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
if quit {
return
}
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
} else {
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.groupSnapshotContentQueue.Forget(keyObj)
}
}
// syncGroupSnapshotByKey processes a VolumeGroupSnapshot request.
func (ctrl *csiSnapshotCommonController) syncGroupSnapshotByKey(key string) error {
klog.V(5).Infof("syncGroupSnapshotByKey[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
klog.V(5).Infof("groupSnapshotWorker: group snapshot namespace [%s] name [%s]", namespace, name)
if err != nil {
klog.Errorf("error getting namespace & name of group snapshot %q to get group snapshot from informer: %v", key, err)
return nil
}
groupSnapshot, err := ctrl.groupSnapshotLister.VolumeGroupSnapshots(namespace).Get(name)
if err == nil {
// The volume group snapshot still exists in informer cache, the event must have
// been add/update/sync
newGroupSnapshot, err := ctrl.checkAndUpdateGroupSnapshotClass(groupSnapshot)
if err == nil || (newGroupSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err)) {
// If the VolumeSnapshotClass is not found, we still need to process an update
// so that syncGroupSnapshot can delete the snapshot, should it still exist in the
// cluster after it's been removed from the informer cache
if newGroupSnapshot.ObjectMeta.DeletionTimestamp != nil && errors.IsNotFound(err) {
klog.V(5).Infof("GroupSnapshot %q is being deleted. GroupSnapshotClass has already been removed", key)
}
klog.V(5).Infof("Updating group snapshot %q", key)
return ctrl.updateGroupSnapshot(newGroupSnapshot)
}
return err
}
if err != nil && !errors.IsNotFound(err) {
klog.V(2).Infof("error getting group snapshot %q from informer: %v", key, err)
return err
}
// The group snapshot is not in informer cache, the event must have been "delete"
vgsObj, found, err := ctrl.groupSnapshotStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting group snapshot %q from cache: %v", key, err)
return nil
}
if !found {
// The controller has already processed the delete event and
// deleted the group snapshot from its cache
klog.V(2).Infof("deletion of group snapshot %q was already processed", key)
return nil
}
groupSnapshot, ok := vgsObj.(*crdv1alpha1.VolumeGroupSnapshot)
if !ok {
klog.Errorf("expected vgs, got %+v", vgsObj)
return nil
}
klog.V(5).Infof("deleting group snapshot %q", key)
ctrl.deleteGroupSnapshot(groupSnapshot)
return nil
}
// checkAndUpdateGroupSnapshotClass gets the VolumeGroupSnapshotClass from VolumeGroupSnapshot.
// If it is not set, gets it from default VolumeGroupSnapshotClass and sets it.
// On error, it must return the original group snapshot, not nil, because the caller
// syncGroupSnapshotByKey needs to check group snapshot's timestamp.
func (ctrl *csiSnapshotCommonController) checkAndUpdateGroupSnapshotClass(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) (*crdv1alpha1.VolumeGroupSnapshot, error) {
className := groupSnapshot.Spec.VolumeGroupSnapshotClassName
var class *crdv1alpha1.VolumeGroupSnapshotClass
var err error
newGroupSnapshot := groupSnapshot
if className != nil {
klog.V(5).Infof("checkAndUpdateGroupSnapshotClass [%s]: VolumeGroupSnapshotClassName [%s]", groupSnapshot.Name, *className)
class, err = ctrl.getGroupSnapshotClass(*className)
if err != nil {
klog.Errorf("checkAndUpdateGroupSnapshotClass failed to getGroupSnapshotClass %v", err)
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "GetGroupSnapshotClassFailed", fmt.Sprintf("failed to get group snapshot class with error %v", err))
// we need to return the original group snapshot even if the class isn't found, as it may need to be deleted
return newGroupSnapshot, err
}
} else {
klog.V(5).Infof("checkAndUpdateGroupSnapshotClass [%s]: SetDefaultGroupSnapshotClass", groupSnapshot.Name)
class, newGroupSnapshot, err = ctrl.SetDefaultGroupSnapshotClass(groupSnapshot)
if err != nil {
klog.Errorf("checkAndUpdateGroupSnapshotClass failed to setDefaultClass %v", err)
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, false, v1.EventTypeWarning, "SetDefaultGroupSnapshotClassFailed", fmt.Sprintf("Failed to set default group snapshot class with error %v", err))
return groupSnapshot, err
}
}
// For pre-provisioned group snapshots, we may not have group snapshot class
if class != nil {
klog.V(5).Infof("VolumeGroupSnapshotClass [%s] Driver [%s]", class.Name, class.Driver)
}
return newGroupSnapshot, nil
}
// syncGroupSnapshotContentByKey processes a VolumeGroupSnapshotContent request.
func (ctrl *csiSnapshotCommonController) syncGroupSnapshotContentByKey(key string) error {
klog.V(5).Infof("syncGroupSnapshotContentByKey[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of groupSnapshotContent %q to get groupSnapshotContent from informer: %v", key, err)
return nil
}
content, err := ctrl.groupSnapshotContentLister.Get(name)
// The content still exists in informer cache, the event must have
// been add/update/sync
if err == nil {
// If error occurs we add this item back to the queue
return ctrl.updateGroupSnapshotContent(content)
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting group snapshot content %q from informer: %v", key, err)
return nil
}
// The group snapshot content is not in informer cache, the event must have been "delete"
contentObj, found, err := ctrl.groupSnapshotContentStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting group snapshot content %q from cache: %v", key, err)
return nil
}
if !found {
// The controller has already processed the delete event and
// deleted the group snapshot content from its cache
klog.V(2).Infof("deletion of group snapshot content %q was already processed", key)
return nil
}
content, ok := contentObj.(*crdv1alpha1.VolumeGroupSnapshotContent)
if !ok {
klog.Errorf("expected group snapshot content, got %+v", content)
return nil
}
ctrl.deleteGroupSnapshotContent(content)
return nil
}
// updateGroupSnapshotContent runs in worker thread and handles "groupsnapshotcontent added",
// "groupsnapshotcontent updated" and "periodic sync" events.
func (ctrl *csiSnapshotCommonController) updateGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) error {
// Store the new group snapshot content version in the cache and do not process
// it if this is an old version.
new, err := ctrl.storeGroupSnapshotContentUpdate(content)
if err != nil {
klog.Errorf("%v", err)
}
if !new {
return nil
}
err = ctrl.syncGroupSnapshotContent(content)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync group snapshot content %q: %+v", content.Name, err)
} else {
klog.Errorf("could not sync group snapshot content %q: %+v", content.Name, err)
}
return err
}
return nil
}
// deleteGroupSnapshotContent runs in worker thread and handles "groupsnapshotcontent deleted" event.
func (ctrl *csiSnapshotCommonController) deleteGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) {
_ = ctrl.contentStore.Delete(content)
klog.V(4).Infof("group snapshot content %q deleted", content.Name)
groupSnapshotName := utils.GroupSnapshotRefKey(&content.Spec.VolumeGroupSnapshotRef)
if groupSnapshotName == "" {
klog.V(5).Infof("deleteGroupContent[%q]: group snapshot content not bound", content.Name)
return
}
// sync the group snapshot when its group snapshot content is deleted. Explicitly
// sync'ing the group snapshot here in response to group snapshot content deletion
// prevents the group snapshot from waiting until the next sync period for its release.
klog.V(5).Infof("deleteGroupContent[%q]: scheduling sync of group snapshot %s", content.Name, groupSnapshotName)
ctrl.groupSnapshotQueue.Add(groupSnapshotName)
}

View File

@@ -0,0 +1,85 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package group_snapshotter
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc"
"google.golang.org/grpc"
klog "k8s.io/klog/v2"
"time"
)
// GroupSnapshotter implements CreateGroupSnapshot/DeleteGroupSnapshot operations against a CSI driver.
type GroupSnapshotter interface {
// CreateGroupSnapshot creates a group snapshot for multiple volumes
CreateGroupSnapshot(ctx context.Context, groupSnapshotName string, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, groupSnapshotId string, snapshots []*csi.Snapshot, timestamp time.Time, readyToUse bool, err error)
// DeleteGroupSnapshot deletes a group snapshot of multiple volumes
DeleteGroupSnapshot(ctx context.Context, groupSnapshotID string, snapshotterCredentials map[string]string) (err error)
// GetGroupSnapshotStatus returns if a group snapshot is ready to use, its creation time, etc
GetGroupSnapshotStatus(ctx context.Context, groupSnapshotID string, snapshotterListCredentials map[string]string) (bool, time.Time, error)
}
type groupSnapshot struct {
conn *grpc.ClientConn
}
func NewGroupSnapshotter(conn *grpc.ClientConn) GroupSnapshotter {
return &groupSnapshot{
conn: conn,
}
}
func (gs *groupSnapshot) CreateGroupSnapshot(ctx context.Context, groupSnapshotName string, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, []*csi.Snapshot, time.Time, bool, error) {
klog.V(5).Infof("CSI CreateGroupSnapshot: %s", groupSnapshotName)
client := csi.NewGroupControllerClient(gs.conn)
driverName, err := csirpc.GetDriverName(ctx, gs.conn)
if err != nil {
return "", "", nil, time.Time{}, false, err
}
req := csi.CreateVolumeGroupSnapshotRequest{
Name: groupSnapshotName,
SourceVolumeIds: volumeIDs,
Secrets: snapshotterCredentials,
Parameters: parameters,
}
rsp, err := client.CreateVolumeGroupSnapshot(ctx, &req)
if err != nil {
return "", "", nil, time.Time{}, false, err
}
klog.V(5).Infof("CSI CreateGroupSnapshot: %s driver name [%s] group snapshot ID [%s] time stamp [%v] snapshots [%v] readyToUse [%v]", groupSnapshotName, driverName, rsp.GroupSnapshot.GroupSnapshotId, rsp.GroupSnapshot.CreationTime, rsp.GroupSnapshot.Snapshots, rsp.GroupSnapshot.ReadyToUse)
creationTime := rsp.GroupSnapshot.CreationTime.AsTime()
return driverName, rsp.GroupSnapshot.GroupSnapshotId, rsp.GroupSnapshot.Snapshots, creationTime, rsp.GroupSnapshot.ReadyToUse, nil
}
func (gs *groupSnapshot) DeleteGroupSnapshot(ctx context.Context, groupSnapshotID string, snapshotterCredentials map[string]string) error {
// TODO: Implement DeleteGroupSnapshot
return nil
}
func (gs *groupSnapshot) GetGroupSnapshotStatus(ctx context.Context, groupSnapshotID string, snapshotterListCredentials map[string]string) (bool, time.Time, error) {
// TODO: Implement GetGroupSnapshotStatus
return true, time.Now(), nil
}

View File

@@ -19,6 +19,9 @@ package sidecar_controller
import (
"context"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
"strings"
"time"
@@ -31,28 +34,39 @@ type Handler interface {
CreateSnapshot(content *crdv1.VolumeSnapshotContent, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error)
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent, snapshotterListCredentials map[string]string) (bool, time.Time, int64, error)
CreateGroupSnapshot(content *crdv1alpha1.VolumeGroupSnapshotContent, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, []*csi.Snapshot, time.Time, bool, error)
GetGroupSnapshotStatus(content *crdv1alpha1.VolumeGroupSnapshotContent, snapshotterListCredentials map[string]string) (bool, time.Time, error)
}
// csiHandler is a handler that calls CSI to create/delete volume snapshot.
type csiHandler struct {
snapshotter snapshotter.Snapshotter
timeout time.Duration
snapshotNamePrefix string
snapshotNameUUIDLength int
snapshotter snapshotter.Snapshotter
groupSnapshotter group_snapshotter.GroupSnapshotter
timeout time.Duration
snapshotNamePrefix string
snapshotNameUUIDLength int
groupSnapshotNamePrefix string
groupSnapshotNameUUIDLength int
}
// NewCSIHandler returns a handler which includes the csi connection and Snapshot name details
func NewCSIHandler(
snapshotter snapshotter.Snapshotter,
groupSnapshotter group_snapshotter.GroupSnapshotter,
timeout time.Duration,
snapshotNamePrefix string,
snapshotNameUUIDLength int,
groupSnapshotNamePrefix string,
groupSnapshotNameUUIDLength int,
) Handler {
return &csiHandler{
snapshotter: snapshotter,
timeout: timeout,
snapshotNamePrefix: snapshotNamePrefix,
snapshotNameUUIDLength: snapshotNameUUIDLength,
snapshotter: snapshotter,
groupSnapshotter: groupSnapshotter,
timeout: timeout,
snapshotNamePrefix: snapshotNamePrefix,
snapshotNameUUIDLength: snapshotNameUUIDLength,
groupSnapshotNamePrefix: groupSnapshotNamePrefix,
groupSnapshotNameUUIDLength: groupSnapshotNameUUIDLength,
}
}
@@ -131,3 +145,55 @@ func makeSnapshotName(prefix, snapshotUID string, snapshotNameUUIDLength int) (s
}
return fmt.Sprintf("%s-%s", prefix, strings.Replace(snapshotUID, "-", "", -1)[0:snapshotNameUUIDLength]), nil
}
func (handler *csiHandler) CreateGroupSnapshot(content *crdv1alpha1.VolumeGroupSnapshotContent, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, []*csi.Snapshot, time.Time, bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
if content.Spec.VolumeGroupSnapshotRef.UID == "" {
return "", "", nil, time.Time{}, false, fmt.Errorf("cannot create group snapshot. Group snapshot content %s not bound to a group snapshot", content.Name)
}
if len(volumeIDs) == 0 {
return "", "", nil, time.Time{}, false, fmt.Errorf("cannot create group snapshot. PVCs to be snapshotted not found in group snapshot content %s", content.Name)
}
groupSnapshotName, err := handler.makeGroupSnapshotName(string(content.Spec.VolumeGroupSnapshotRef.UID))
if err != nil {
return "", "", nil, time.Time{}, false, err
}
return handler.groupSnapshotter.CreateGroupSnapshot(ctx, groupSnapshotName, volumeIDs, parameters, snapshotterCredentials)
}
func (handler *csiHandler) GetGroupSnapshotStatus(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent, snapshotterListCredentials map[string]string) (bool, time.Time, error) {
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
var groupSnapshotHandle string
var err error
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.VolumeGroupSnapshotHandle != nil {
groupSnapshotHandle = *groupSnapshotContent.Status.VolumeGroupSnapshotHandle
} else if groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle != nil {
groupSnapshotHandle = *groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle
} else {
return false, time.Time{}, fmt.Errorf("failed to list group snapshot for group snapshot content %s: groupSnapshotHandle is missing", groupSnapshotContent.Name)
}
csiSnapshotStatus, timestamp, err := handler.groupSnapshotter.GetGroupSnapshotStatus(ctx, groupSnapshotHandle, snapshotterListCredentials)
if err != nil {
return false, time.Time{}, fmt.Errorf("failed to list group snapshot for group snapshot content %s: %q", groupSnapshotContent.Name, err)
}
return csiSnapshotStatus, timestamp, nil
}
func (handler *csiHandler) makeGroupSnapshotName(groupSnapshotUID string) (string, error) {
if len(groupSnapshotUID) == 0 {
return "", fmt.Errorf("group snapshot object is missing UID")
}
if handler.groupSnapshotNameUUIDLength == -1 {
return fmt.Sprintf("%s-%s", handler.groupSnapshotNamePrefix, groupSnapshotUID), nil
}
return fmt.Sprintf("%s-%s", handler.groupSnapshotNamePrefix, strings.Replace(groupSnapshotUID, "-", "", -1)[0:handler.groupSnapshotNameUUIDLength]), nil
}

View File

@@ -566,12 +566,19 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory.Snapshot().V1().VolumeSnapshotContents(),
informerFactory.Snapshot().V1().VolumeSnapshotClasses(),
fakeSnapshot,
nil, // TODO: Replace with fake group snapshotter
5*time.Millisecond,
60*time.Second,
"snapshot",
-1,
"groupsnapshot",
-1,
true,
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
false,
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
)
ctrl.eventRecorder = record.NewFakeRecorder(1000)

View File

@@ -0,0 +1,688 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sidecar_controller
import (
"context"
"crypto/sha256"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
klog "k8s.io/klog/v2"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils"
)
func (ctrl *csiSnapshotSideCarController) storeGroupSnapshotContentUpdate(groupSnapshotContent interface{}) (bool, error) {
return utils.StoreObjectUpdate(ctrl.groupSnapshotContentStore, groupSnapshotContent, "groupsnapshotcontent")
}
// enqueueGroupSnapshotContentWork adds group snapshot content to given work queue.
func (ctrl *csiSnapshotSideCarController) enqueueGroupSnapshotContentWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if groupSnapshotContent, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(groupSnapshotContent)
if err != nil {
klog.Errorf("failed to get key from object: %v, %v", err, groupSnapshotContent)
return
}
klog.V(5).Infof("enqueued %q for sync", objName)
ctrl.groupSnapshotContentQueue.Add(objName)
}
}
// groupSnapshotContentWorker processes items from groupSnapshotContentQueue.
// It must run only once, syncGroupSnapshotContent is not assured to be reentrant.
func (ctrl *csiSnapshotSideCarController) groupSnapshotContentWorker() {
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
if quit {
return
}
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", keyObj.(string), err)
return
}
// Finally, if no error occurs we forget this item so it does not
// get queued again until another change happens.
ctrl.groupSnapshotContentQueue.Forget(keyObj)
return
}
func (ctrl *csiSnapshotSideCarController) syncGroupSnapshotContentByKey(key string) error {
klog.V(5).Infof("syncGroupSnapshotContentByKey[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of groupSnapshotContent %q from informer: %v", key, err)
return nil
}
groupSnapshotContent, err := ctrl.groupSnapshotContentLister.Get(name)
// The group snapshot content still exists in informer cache, the event must
// have been add/update/sync
if err == nil {
if ctrl.isDriverMatch(groupSnapshotContent) {
err = ctrl.updateGroupSnapshotContentInInformerCache(groupSnapshotContent)
}
if err != nil {
// If error occurs we add this item back to the queue
return err
}
return nil
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting group snapshot content %q from informer: %v", key, err)
return nil
}
// The groupSnapshotContent is not in informer cache, the event must have been
// "delete"
groupSnapshotContentObj, found, err := ctrl.groupSnapshotContentStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting group snapshot content %q from cache: %v", key, err)
return nil
}
if !found {
// The controller has already processed the delete event and
// deleted the group snapshot content from its cache
klog.V(2).Infof("deletion of group snapshot content %q was already processed", key)
return nil
}
groupSnapshotContent, ok := groupSnapshotContentObj.(*crdv1alpha1.VolumeGroupSnapshotContent)
if !ok {
klog.Errorf("expected group snapshot content, got %+v", groupSnapshotContent)
return nil
}
ctrl.deleteGroupSnapshotContentInCacheStore(groupSnapshotContent)
return nil
}
// updateGroupSnapshotContentInInformerCache runs in worker thread and handles
// "group snapshot content added", "group snapshot content updated" and "periodic
// sync" events.
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentInInformerCache(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
// Store the new group snapshot content version in the cache and do not process
// it if this is an old version.
new, err := ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContent)
if err != nil {
klog.Errorf("%v", err)
}
if !new {
return nil
}
err = ctrl.syncGroupSnapshotContent(groupSnapshotContent)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync group snapshot content %q: %+v", groupSnapshotContent.Name, err)
} else {
klog.Errorf("could not sync group snapshot content %q: %+v", groupSnapshotContent.Name, err)
}
return err
}
return nil
}
// deleteGroupSnapshotContentInCacheStore runs in worker thread and handles "group
// snapshot content deleted" event.
func (ctrl *csiSnapshotSideCarController) deleteGroupSnapshotContentInCacheStore(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) {
_ = ctrl.groupSnapshotContentStore.Delete(groupSnapshotContent)
klog.V(4).Infof("group snapshot content %q deleted", groupSnapshotContent.Name)
}
// syncGroupSnapshotContent deals with one key off the queue. It returns false when it's time to quit.
func (ctrl *csiSnapshotSideCarController) syncGroupSnapshotContent(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("synchronizing VolumeGroupSnapshotContent[%s]", groupSnapshotContent.Name)
/*
TODO: Check if the group snapshot content should be deleted
*/
if len(groupSnapshotContent.Spec.Source.PersistentVolumeNames) != 0 && groupSnapshotContent.Status == nil {
klog.V(5).Infof("syncGroupSnapshotContent: Call CreateGroupSnapshot for group snapshot content %s", groupSnapshotContent.Name)
return ctrl.createGroupSnapshot(groupSnapshotContent)
}
// Skip checkandUpdateGroupSnapshotContentStatus() if ReadyToUse is already
// true. We don't want to keep calling CreateGroupSnapshot CSI methods over
// and over again for performance reasons.
var err error
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.ReadyToUse != nil && *groupSnapshotContent.Status.ReadyToUse == true {
// Try to remove AnnVolumeGroupSnapshotBeingCreated if it is not removed yet for some reason
_, err = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent)
return err
}
return ctrl.checkandUpdateGroupSnapshotContentStatus(groupSnapshotContent)
}
// createGroupSnapshot starts new asynchronous operation to create group snapshot
func (ctrl *csiSnapshotSideCarController) createGroupSnapshot(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("createGroupSnapshot for group snapshot content [%s]: started", groupSnapshotContent.Name)
groupSnapshotContentObj, err := ctrl.createGroupSnapshotWrapper(groupSnapshotContent)
if err != nil {
ctrl.updateGroupSnapshotContentErrorStatusWithEvent(groupSnapshotContentObj, v1.EventTypeWarning, "GroupSnapshotCreationFailed", fmt.Sprintf("Failed to create group snapshot: %v", err))
klog.Errorf("createGroupSnapshot for groupSnapshotContent [%s]: error occurred in createGroupSnapshotWrapper: %v", groupSnapshotContent.Name, err)
return err
}
_, updateErr := ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContentObj)
if updateErr != nil {
// We will get a "group snapshot update" event soon, this is not a big error
klog.V(4).Infof("createGroupSnapshot for groupSnapshotContent [%s]: cannot update internal groupSnapshotContent cache: %v", groupSnapshotContent.Name, updateErr)
}
return nil
}
// This is a wrapper function for the group snapshot creation process.
func (ctrl *csiSnapshotSideCarController) createGroupSnapshotWrapper(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.Infof("createGroupSnapshotWrapper: Creating group snapshot for group snapshot content %s through the plugin ...", groupSnapshotContent.Name)
class, snapshotterCredentials, err := ctrl.getCSIGroupSnapshotInput(groupSnapshotContent)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to get input parameters to create group snapshot for group snapshot content %s: %q", groupSnapshotContent.Name, err)
}
// NOTE(xyang): handle create timeout
// Add an annotation to indicate the group snapshot creation request has been
// sent to the storage system and the controller is waiting for a response.
// The annotation will be removed after the storage system has responded with
// success or permanent failure. If the request times out, annotation will
// remain on the groupSnapshotContent to avoid potential leaking of a group snapshot resource on
// the storage system.
groupSnapshotContent, err = ctrl.setAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to add VolumeGroupSnapshotBeingCreated annotation on the group snapshot content %s: %q", groupSnapshotContent.Name, err)
}
parameters, err := utils.RemovePrefixedParameters(class.Parameters)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
}
if ctrl.extraCreateMetadata {
parameters[utils.PrefixedVolumeGroupSnapshotNameKey] = groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Name
parameters[utils.PrefixedVolumeGroupSnapshotNamespaceKey] = groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Namespace
parameters[utils.PrefixedVolumeGroupSnapshotContentNameKey] = groupSnapshotContent.Name
}
volumeIDs, uuidMap, err := ctrl.getGroupSnapshotVolumeIDs(groupSnapshotContent)
driverName, groupSnapshotID, snapshots, creationTime, readyToUse, err := ctrl.handler.CreateGroupSnapshot(groupSnapshotContent, volumeIDs, parameters, snapshotterCredentials)
if err != nil {
// NOTE(xyang): handle create timeout
// If it is a final error, remove annotation to indicate
// storage system has responded with an error
klog.Infof("createGroupSnapshotWrapper: CreateGroupSnapshot for groupSnapshotContent %s returned error: %v", groupSnapshotContent.Name, err)
if isCSIFinalError(err) {
var removeAnnotationErr error
if groupSnapshotContent, removeAnnotationErr = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent); removeAnnotationErr != nil {
return groupSnapshotContent, fmt.Errorf("failed to remove VolumeGroupSnapshotBeingCreated annotation from the group snapshot content %s: %s", groupSnapshotContent.Name, removeAnnotationErr)
}
}
return groupSnapshotContent, fmt.Errorf("failed to take group snapshot of the volumes %s: %q", groupSnapshotContent.Spec.Source.PersistentVolumeNames, err)
}
klog.V(5).Infof("Created group snapshot: driver %s, groupSnapshotId %s, creationTime %v, readyToUse %t", driverName, groupSnapshotID, creationTime, readyToUse)
if creationTime.IsZero() {
creationTime = time.Now()
}
// Create individual snapshots and snapshot contents
var snapshotContentNames []string
for _, snapshot := range snapshots {
uuid, ok := uuidMap[snapshot.SourceVolumeId]
if !ok {
continue
}
volumeSnapshotContentName := GetSnapshotContentNameForVolumeGroupSnapshotContent(string(groupSnapshotContent.UID), uuid)
volumeSnapshotName := GetSnapshotNameForVolumeGroupSnapshotContent(string(groupSnapshotContent.UID), uuid)
volumeSnapshotNamespace := groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Namespace
volumeSnapshotContent := &crdv1.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: volumeSnapshotContentName,
},
Spec: crdv1.VolumeSnapshotContentSpec{
VolumeSnapshotRef: v1.ObjectReference{
Kind: "VolumeSnapshots",
Name: volumeSnapshotName,
Namespace: volumeSnapshotNamespace,
},
DeletionPolicy: groupSnapshotContent.Spec.DeletionPolicy,
Driver: groupSnapshotContent.Spec.Driver,
Source: crdv1.VolumeSnapshotContentSource{
SnapshotHandle: &snapshot.SnapshotId,
},
// TODO: Populate this field when volume mode conversion is enabled by default
SourceVolumeMode: nil,
},
Status: &crdv1.VolumeSnapshotContentStatus{
VolumeGroupSnapshotContentName: &groupSnapshotContent.Name,
},
}
label := make(map[string]string)
label["volumeGroupSnapshotName"] = groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Name
name := "f"
volumeSnapshot := &crdv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: volumeSnapshotName,
Namespace: volumeSnapshotNamespace,
Labels: label,
},
Spec: crdv1.VolumeSnapshotSpec{
Source: crdv1.VolumeSnapshotSource{
VolumeSnapshotContentName: &volumeSnapshotContentName,
},
},
}
vsc, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(context.TODO(), volumeSnapshotContent, metav1.CreateOptions{})
if err != nil {
return groupSnapshotContent, err
}
snapshotContentNames = append(snapshotContentNames, vsc.Name)
klog.Infof("making snapshot %v %s %s", volumeSnapshot.Status, *volumeSnapshot.Status.VolumeGroupSnapshotName, name)
_, err = ctrl.clientset.SnapshotV1().VolumeSnapshots(volumeSnapshotNamespace).Create(context.TODO(), volumeSnapshot, metav1.CreateOptions{})
if err != nil {
return groupSnapshotContent, err
}
// klog.Infof("raunak made snapshot 1 %v", spew.Sdump(sn))
// sn.Status = &crdv1.VolumeSnapshotStatus{
// VolumeGroupSnapshotName: &name,
// }
// sn, err = ctrl.clientset.SnapshotV1().VolumeSnapshots(volumeSnapshotNamespace).UpdateStatus(context.TODO(), sn, metav1.UpdateOptions{})
// if err != nil {
// klog.Infof("failed 2")
// return groupSnapshotContent, err
// }
// klog.Infof("made snapshot 2 %v", spew.Sdump(sn))
}
klog.Infof("raunak 2")
newGroupSnapshotContent, err := ctrl.updateGroupSnapshotContentStatus(groupSnapshotContent, groupSnapshotID, readyToUse, creationTime.UnixNano(), snapshotContentNames)
if err != nil {
klog.Errorf("error updating status for volume group snapshot content %s: %v.", groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("error updating status for volume group snapshot content %s: %v", groupSnapshotContent.Name, err)
}
groupSnapshotContent = newGroupSnapshotContent
// NOTE(xyang): handle create timeout
// Remove annotation to indicate storage system has successfully
// cut the group snapshot
groupSnapshotContent, err = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to remove VolumeGroupSnapshotBeingCreated annotation on the groupSnapshotContent %s: %q", groupSnapshotContent.Name, err)
}
return groupSnapshotContent, nil
}
func (ctrl *csiSnapshotSideCarController) getCSIGroupSnapshotInput(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotClass, map[string]string, error) {
className := groupSnapshotContent.Spec.VolumeGroupSnapshotClassName
klog.V(5).Infof("getCSIGroupSnapshotInput for group snapshot content [%s]", groupSnapshotContent.Name)
var class *crdv1alpha1.VolumeGroupSnapshotClass
var err error
if className != nil {
class, err = ctrl.getGroupSnapshotClass(*className)
if err != nil {
klog.Errorf("getCSISnapshotInput failed to getClassFromVolumeGroupSnapshot %s", err)
return nil, nil, err
}
} else {
// If dynamic provisioning, return failure if no group snapshot class
if len(groupSnapshotContent.Spec.Source.PersistentVolumeNames) != 0 {
klog.Errorf("failed to getCSISnapshotInput %s without a group snapshot class", groupSnapshotContent.Name)
return nil, nil, fmt.Errorf("failed to take group snapshot %s without a group snapshot class", groupSnapshotContent.Name)
}
// For pre-provisioned group snapshot, group snapshot class is not required
klog.V(5).Infof("getCSISnapshotInput for groupSnapshotContent [%s]: no VolumeGroupSnapshotClassName provided for pre-provisioned group snapshot", groupSnapshotContent.Name)
}
// TODO: Resolve snapshotting secret credentials.
return class, nil, nil
}
// getGroupSnapshotClass is a helper function to get group snapshot class from the class name.
func (ctrl *csiSnapshotSideCarController) getGroupSnapshotClass(className string) (*crdv1alpha1.VolumeGroupSnapshotClass, error) {
klog.V(5).Infof("getGroupSnapshotClass: VolumeGroupSnapshotClassName [%s]", className)
class, err := ctrl.groupSnapshotClassLister.Get(className)
if err != nil {
klog.Errorf("failed to retrieve group snapshot class %s from the informer: %q", className, err)
return nil, err
}
return class, nil
}
// setAnnVolumeGroupSnapshotBeingCreated sets VolumeGroupSnapshotBeingCreated annotation
// on VolumeGroupSnapshotContent
// If set, it indicates group snapshot is being created
func (ctrl *csiSnapshotSideCarController) setAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
if metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingCreated) {
// the annotation already exists, return directly
return groupSnapshotContent, nil
}
// Set AnnVolumeGroupSnapshotBeingCreated
// Combine existing annotations with the new annotations.
// If there are no existing annotations, we create a new map.
klog.V(5).Infof("setAnnVolumeGroupSnapshotBeingCreated: set annotation [%s:yes] on groupSnapshotContent [%s].", utils.AnnVolumeGroupSnapshotBeingCreated, groupSnapshotContent.Name)
patchedAnnotations := make(map[string]string)
for k, v := range groupSnapshotContent.GetAnnotations() {
patchedAnnotations[k] = v
}
patchedAnnotations[utils.AnnVolumeGroupSnapshotBeingCreated] = "yes"
var patches []utils.PatchOp
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/metadata/annotations",
Value: patchedAnnotations,
})
patchedGroupSnapshotContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContent, patches, ctrl.clientset)
if err != nil {
return groupSnapshotContent, newControllerUpdateError(groupSnapshotContent.Name, err.Error())
}
// update groupSnapshotContent if update is successful
groupSnapshotContent = patchedGroupSnapshotContent
_, err = ctrl.storeContentUpdate(groupSnapshotContent)
if err != nil {
klog.V(4).Infof("setAnnVolumeGroupSnapshotBeingCreated for groupSnapshotContent [%s]: cannot update internal cache %v", groupSnapshotContent.Name, err)
}
klog.V(5).Infof("setAnnVolumeGroupSnapshotBeingCreated: volume group snapshot content %+v", groupSnapshotContent)
return groupSnapshotContent, nil
}
func (ctrl *csiSnapshotSideCarController) getGroupSnapshotVolumeIDs(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) ([]string, map[string]string, error) {
// TODO: Get add PV lister
var volumeIDs []string
uuidMap := make(map[string]string)
for _, pvName := range groupSnapshotContent.Spec.Source.PersistentVolumeNames {
pv, err := ctrl.client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
volumeIDs = append(volumeIDs, pv.Spec.CSI.VolumeHandle)
if pv.Spec.ClaimRef != nil {
uuidMap[pv.Spec.CSI.VolumeHandle] = string(pv.Spec.ClaimRef.UID)
}
}
}
return volumeIDs, uuidMap, nil
}
// removeAnnVolumeGroupSnapshotBeingCreated removes the VolumeGroupSnapshotBeingCreated
// annotation from a groupSnapshotContent if there exists one.
func (ctrl csiSnapshotSideCarController) removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
if !metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingCreated) {
// the annotation does not exist, return directly
return groupSnapshotContent, nil
}
groupSnapshotContentClone := groupSnapshotContent.DeepCopy()
delete(groupSnapshotContentClone.ObjectMeta.Annotations, utils.AnnVolumeGroupSnapshotBeingCreated)
updatedContent, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Update(context.TODO(), groupSnapshotContentClone, metav1.UpdateOptions{})
if err != nil {
return groupSnapshotContent, newControllerUpdateError(groupSnapshotContent.Name, err.Error())
}
klog.V(5).Infof("Removed VolumeGroupSnapshotBeingCreated annotation from volume group snapshot content %s", groupSnapshotContent.Name)
_, err = ctrl.storeContentUpdate(updatedContent)
if err != nil {
klog.Errorf("failed to update groupSnapshotContent store %v", err)
}
return updatedContent, nil
}
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentStatus(
groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent,
groupSnapshotHandle string,
readyToUse bool,
createdAt int64,
snapshotContentNames []string) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.V(5).Infof("updateSnapshotContentStatus: updating VolumeGroupSnapshotContent [%s], groupSnapshotHandle %s, readyToUse %v, createdAt %v", groupSnapshotContent.Name, groupSnapshotHandle, readyToUse, createdAt)
groupSnapshotContentObj, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Get(context.TODO(), groupSnapshotContent.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get group snapshot content %s from api server: %v", groupSnapshotContent.Name, err)
}
var newStatus *crdv1alpha1.VolumeGroupSnapshotContentStatus
updated := false
if groupSnapshotContentObj.Status == nil {
newStatus = &crdv1alpha1.VolumeGroupSnapshotContentStatus{
VolumeGroupSnapshotHandle: &groupSnapshotHandle,
ReadyToUse: &readyToUse,
CreationTime: &createdAt,
}
for _, name := range snapshotContentNames {
newStatus.VolumeSnapshotContentRefList = append(newStatus.VolumeSnapshotContentRefList, v1.ObjectReference{
Kind: "VolumeSnapshotContent",
Name: name,
})
}
updated = true
} else {
newStatus = groupSnapshotContentObj.Status.DeepCopy()
if newStatus.VolumeGroupSnapshotHandle == nil {
newStatus.VolumeGroupSnapshotHandle = &groupSnapshotHandle
updated = true
}
if newStatus.ReadyToUse == nil || *newStatus.ReadyToUse != readyToUse {
newStatus.ReadyToUse = &readyToUse
updated = true
if readyToUse && newStatus.Error != nil {
newStatus.Error = nil
}
}
if newStatus.CreationTime == nil {
newStatus.CreationTime = &createdAt
updated = true
}
if len(newStatus.VolumeSnapshotContentRefList) == 0 {
for _, name := range snapshotContentNames {
newStatus.VolumeSnapshotContentRefList = append(newStatus.VolumeSnapshotContentRefList, v1.ObjectReference{
Kind: "VolumeSnapshotContent",
Name: name,
})
}
updated = true
}
}
if updated {
groupSnapshotContentClone := groupSnapshotContentObj.DeepCopy()
groupSnapshotContentClone.Status = newStatus
newContent, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().UpdateStatus(context.TODO(), groupSnapshotContentClone, metav1.UpdateOptions{})
if err != nil {
return groupSnapshotContentObj, newControllerUpdateError(groupSnapshotContent.Name, err.Error())
}
return newContent, nil
}
return groupSnapshotContentObj, nil
}
// updateContentStatusWithEvent saves new groupSnapshotContent.Status to API server
// and emits given event on the groupSnapshotContent. It saves the status and emits
// the event only when the status has actually changed from the version saved in API server.
// Parameters:
//
// * groupSnapshotContent - group snapshot content to update
// * eventtype, reason, message - event to send, see EventRecorder.Event()
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentErrorStatusWithEvent(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent, eventtype, reason, message string) error {
klog.V(5).Infof("updateGroupSnapshotContentStatusWithEvent[%s]", groupSnapshotContent.Name)
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.Error != nil && *groupSnapshotContent.Status.Error.Message == message {
klog.V(4).Infof("updateGroupSnapshotContentStatusWithEvent[%s]: the same error %v is already set", groupSnapshotContent.Name, groupSnapshotContent.Status.Error)
return nil
}
var patches []utils.PatchOp
ready := false
groupSnapshotContentStatusError := &crdv1.VolumeSnapshotError{
Time: &metav1.Time{
Time: time.Now(),
},
Message: &message,
}
if groupSnapshotContent.Status == nil {
// Initialize status if nil
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status",
Value: &crdv1alpha1.VolumeGroupSnapshotContentStatus{
ReadyToUse: &ready,
Error: groupSnapshotContentStatusError,
},
})
} else {
// Patch status if non-nil
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status/error",
Value: groupSnapshotContentStatusError,
})
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status/readyToUse",
Value: &ready,
})
}
newContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContent, patches, ctrl.clientset, "status")
// Emit the event even if the status update fails so that user can see the error
ctrl.eventRecorder.Event(newContent, eventtype, reason, message)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status failed %v", groupSnapshotContent.Name, err)
return err
}
_, err = ctrl.storeGroupSnapshotContentUpdate(newContent)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status: cannot update internal cache %v", groupSnapshotContent.Name, err)
return err
}
return nil
}
// GetSnapshotNameForVolumeGroupSnapshotContent returns a unique snapshot name for a VolumeGroupSnapshotContent.
func GetSnapshotNameForVolumeGroupSnapshotContent(groupSnapshotContentUUID, pvUUID string) string {
return fmt.Sprintf("snapshot-%x-%s", sha256.Sum256([]byte(groupSnapshotContentUUID+pvUUID)), time.Now().Format("2006-01-02-3.4.5"))
}
// GetSnapshotContentNameForVolumeGroupSnapshotContent returns a unique content name for the
// passed in VolumeGroupSnapshotContent.
func GetSnapshotContentNameForVolumeGroupSnapshotContent(groupSnapshotContentUUID, pvUUID string) string {
return fmt.Sprintf("snapcontent-%x-%s", sha256.Sum256([]byte(groupSnapshotContentUUID+pvUUID)), time.Now().Format("2006-01-02-3.4.5"))
}
func (ctrl *csiSnapshotSideCarController) checkandUpdateGroupSnapshotContentStatus(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatus[%s] started", groupSnapshotContent.Name)
groupSnapshotContentObj, err := ctrl.checkandUpdateGroupSnapshotContentStatusOperation(groupSnapshotContent)
if err != nil {
ctrl.updateGroupSnapshotContentErrorStatusWithEvent(groupSnapshotContentObj, v1.EventTypeWarning, "GroupSnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update group snapshot content: %v", err))
klog.Errorf("checkandUpdateGroupSnapshotContentStatus [%s]: error occurred %v", groupSnapshotContent.Name, err)
return err
}
_, updateErr := ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContentObj)
if updateErr != nil {
// We will get a "group snapshot update" event soon, this is not a big error
klog.V(4).Infof("checkandUpdateGroupSnapshotContentStatus [%s]: cannot update internal cache: %v", groupSnapshotContent.Name, updateErr)
}
return nil
}
func (ctrl *csiSnapshotSideCarController) checkandUpdateGroupSnapshotContentStatusOperation(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
var err error
var creationTime time.Time
readyToUse := false
var driverName string
var groupSnapshotID string
var snapshotterListCredentials map[string]string
if groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle != nil {
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatusOperation: call GetGroupSnapshotStatus for group snapshot which is pre-bound to group snapshot content [%s]", groupSnapshotContent.Name)
if groupSnapshotContent.Spec.VolumeGroupSnapshotClassName != nil {
class, err := ctrl.getGroupSnapshotClass(*groupSnapshotContent.Spec.VolumeGroupSnapshotClassName)
if err != nil {
klog.Errorf("Failed to get group snapshot class %s for group snapshot content %s: %v", *groupSnapshotContent.Spec.VolumeGroupSnapshotClassName, groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("failed to get group snapshot class %s for group snapshot content %s: %v", *groupSnapshotContent.Spec.VolumeGroupSnapshotClassName, groupSnapshotContent.Name, err)
}
snapshotterListSecretRef, err := utils.GetSecretReference(utils.SnapshotterListSecretParams, class.Parameters, groupSnapshotContent.GetObjectMeta().GetName(), nil)
if err != nil {
klog.Errorf("Failed to get secret reference for group snapshot content %s: %v", groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("failed to get secret reference for group snapshot content %s: %v", groupSnapshotContent.Name, err)
}
snapshotterListCredentials, err = utils.GetCredentials(ctrl.client, snapshotterListSecretRef)
if err != nil {
// Continue with deletion, as the secret may have already been deleted.
klog.Errorf("Failed to get credentials for group snapshot content %s: %v", groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("failed to get credentials for group snapshot content %s: %v", groupSnapshotContent.Name, err)
}
}
readyToUse, creationTime, err = ctrl.handler.GetGroupSnapshotStatus(groupSnapshotContent, snapshotterListCredentials)
if err != nil {
klog.Errorf("checkandUpdateGroupSnapshotContentStatusOperation: failed to call get group snapshot status to check whether group snapshot is ready to use %q", err)
return groupSnapshotContent, err
}
driverName = groupSnapshotContent.Spec.Driver
groupSnapshotID = *groupSnapshotContent.Spec.Source.VolumeGroupSnapshotHandle
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatusOperation: driver %s, groupSnapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, groupSnapshotID, creationTime, readyToUse)
if creationTime.IsZero() {
creationTime = time.Now()
}
// TODO: Get a reference to snapshot contents for this volume group snapshot
updatedContent, err := ctrl.updateGroupSnapshotContentStatus(groupSnapshotContent, groupSnapshotID, readyToUse, creationTime.UnixNano(), []string{})
if err != nil {
return groupSnapshotContent, err
}
return updatedContent, nil
}
return ctrl.createGroupSnapshotWrapper(groupSnapshotContent)
}

View File

@@ -18,15 +18,9 @@ package sidecar_controller
import (
"fmt"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
"time"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned"
storageinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1"
storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/snapshotter"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
@@ -38,6 +32,16 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
klog "k8s.io/klog/v2"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned"
groupsnapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumegroupsnapshot/v1alpha1"
snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1"
groupsnapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumegroupsnapshot/v1alpha1"
snapshotlisters "github.com/kubernetes-csi/external-snapshotter/client/v6/listers/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/snapshotter"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils"
)
type csiSnapshotSideCarController struct {
@@ -48,9 +52,9 @@ type csiSnapshotSideCarController struct {
contentQueue workqueue.RateLimitingInterface
extraCreateMetadata bool
contentLister storagelisters.VolumeSnapshotContentLister
contentLister snapshotlisters.VolumeSnapshotContentLister
contentListerSynced cache.InformerSynced
classLister storagelisters.VolumeSnapshotClassLister
classLister snapshotlisters.VolumeSnapshotClassLister
classListerSynced cache.InformerSynced
contentStore cache.Store
@@ -58,6 +62,14 @@ type csiSnapshotSideCarController struct {
handler Handler
resyncPeriod time.Duration
enableVolumeGroupSnapshots bool
groupSnapshotContentQueue workqueue.RateLimitingInterface
groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister
groupSnapshotContentListerSynced cache.InformerSynced
groupSnapshotClassLister groupsnapshotlisters.VolumeGroupSnapshotClassLister
groupSnapshotClassListerSynced cache.InformerSynced
groupSnapshotContentStore cache.Store
}
// NewCSISnapshotSideCarController returns a new *csiSnapshotSideCarController
@@ -65,15 +77,22 @@ func NewCSISnapshotSideCarController(
clientset clientset.Interface,
client kubernetes.Interface,
driverName string,
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
volumeSnapshotContentInformer snapshotinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer snapshotinformers.VolumeSnapshotClassInformer,
snapshotter snapshotter.Snapshotter,
groupSnapshotter group_snapshotter.GroupSnapshotter,
timeout time.Duration,
resyncPeriod time.Duration,
snapshotNamePrefix string,
snapshotNameUUIDLength int,
groupSnapshotNamePrefix string,
groupSnapshotNameUUIDLength int,
extraCreateMetadata bool,
contentRateLimiter workqueue.RateLimiter,
enableVolumeGroupSnapshots bool,
volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer,
volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer,
groupSnapshotContentRateLimiter workqueue.RateLimiter,
) *csiSnapshotSideCarController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
@@ -86,7 +105,7 @@ func NewCSISnapshotSideCarController(
client: client,
driverName: driverName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength),
resyncPeriod: resyncPeriod,
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-snapshotter-content"),
@@ -124,6 +143,33 @@ func NewCSISnapshotSideCarController(
ctrl.classLister = volumeSnapshotClassInformer.Lister()
ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced
ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots
if enableVolumeGroupSnapshots {
ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "csi-snapshotter-groupsnapshotcontent")
volumeGroupSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) },
UpdateFunc: func(oldObj, newObj interface{}) {
/*
TODO: Determine if we need to skip requeueing in case of CSI driver failure.
*/
ctrl.enqueueGroupSnapshotContentWork(newObj)
},
DeleteFunc: func(obj interface{}) { ctrl.enqueueGroupSnapshotContentWork(obj) },
},
ctrl.resyncPeriod,
)
ctrl.groupSnapshotContentLister = volumeGroupSnapshotContentInformer.Lister()
ctrl.groupSnapshotContentListerSynced = volumeGroupSnapshotContentInformer.Informer().HasSynced
ctrl.groupSnapshotClassLister = volumeGroupSnapshotClassInformer.Lister()
ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotClassInformer.Informer().HasSynced
}
return ctrl
}
@@ -133,15 +179,23 @@ func (ctrl *csiSnapshotSideCarController) Run(workers int, stopCh <-chan struct{
klog.Infof("Starting CSI snapshotter")
defer klog.Infof("Shutting CSI snapshotter")
if !cache.WaitForCacheSync(stopCh, ctrl.contentListerSynced, ctrl.classListerSynced) {
informersSynced := []cache.InformerSynced{ctrl.contentListerSynced, ctrl.classListerSynced}
if ctrl.enableVolumeGroupSnapshots {
informersSynced = append(informersSynced, []cache.InformerSynced{ctrl.groupSnapshotContentListerSynced, ctrl.groupSnapshotClassListerSynced}...)
}
if !cache.WaitForCacheSync(stopCh, informersSynced...) {
klog.Errorf("Cannot sync caches")
return
}
ctrl.initializeCaches(ctrl.contentLister)
ctrl.initializeCaches()
for i := 0; i < workers; i++ {
go wait.Until(ctrl.contentWorker, 0, stopCh)
if ctrl.enableVolumeGroupSnapshots {
go wait.Until(ctrl.groupSnapshotContentWorker, 0, stopCh)
}
}
<-stopCh
@@ -240,25 +294,48 @@ func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
return nil
}
// verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name
func (ctrl *csiSnapshotSideCarController) isDriverMatch(content *crdv1.VolumeSnapshotContent) bool {
if content.Spec.Source.VolumeHandle == nil && content.Spec.Source.SnapshotHandle == nil {
// Skip this snapshot content if it does not have a valid source
return false
}
if content.Spec.Driver != ctrl.driverName {
// Skip this snapshot content if the driver does not match
return false
}
snapshotClassName := content.Spec.VolumeSnapshotClassName
if snapshotClassName != nil {
if snapshotClass, err := ctrl.classLister.Get(*snapshotClassName); err == nil {
if snapshotClass.Driver != ctrl.driverName {
return false
// isDriverMatch verifies whether the driver specified in VolumeSnapshotContent
// or VolumeGroupSnapshotContent matches the controller's driver name
func (ctrl *csiSnapshotSideCarController) isDriverMatch(object interface{}) bool {
if content, ok := object.(*crdv1.VolumeSnapshotContent); ok {
if content.Spec.Source.VolumeHandle == nil && content.Spec.Source.SnapshotHandle == nil {
// Skip this snapshot content if it does not have a valid source
return false
}
if content.Spec.Driver != ctrl.driverName {
// Skip this snapshot content if the driver does not match
return false
}
snapshotClassName := content.Spec.VolumeSnapshotClassName
if snapshotClassName != nil {
if snapshotClass, err := ctrl.classLister.Get(*snapshotClassName); err == nil {
if snapshotClass.Driver != ctrl.driverName {
return false
}
}
}
return true
}
return true
if content, ok := object.(*crdv1alpha1.VolumeGroupSnapshotContent); ok {
if content.Spec.Source.VolumeGroupSnapshotHandle == nil && len(content.Spec.Source.PersistentVolumeNames) == 0 {
// Skip this group snapshot content if it does not have a valid source
return false
}
if content.Spec.Driver != ctrl.driverName {
// Skip this group snapshot content if the driver does not match
return false
}
groupSnapshotClassName := content.Spec.VolumeGroupSnapshotClassName
if groupSnapshotClassName != nil {
if groupSnapshotClass, err := ctrl.groupSnapshotClassLister.Get(*groupSnapshotClassName); err == nil {
if groupSnapshotClass.Driver != ctrl.driverName {
return false
}
}
}
return true
}
return false
}
// updateContentInInformerCache runs in worker thread and handles "content added",
@@ -296,8 +373,8 @@ func (ctrl *csiSnapshotSideCarController) deleteContentInCacheStore(content *crd
// initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addSnapshot/addContent to
// perform initial synchronization of the controller.
func (ctrl *csiSnapshotSideCarController) initializeCaches(contentLister storagelisters.VolumeSnapshotContentLister) {
contentList, err := contentLister.List(labels.Everything())
func (ctrl *csiSnapshotSideCarController) initializeCaches() {
contentList, err := ctrl.contentLister.List(labels.Everything())
if err != nil {
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
@@ -311,5 +388,19 @@ func (ctrl *csiSnapshotSideCarController) initializeCaches(contentLister storage
}
}
if ctrl.enableVolumeGroupSnapshots {
groupSnapshotContentList, err := ctrl.groupSnapshotContentLister.List(labels.Everything())
if err != nil {
klog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, groupSnapshotContent := range groupSnapshotContentList {
groupSnapshotContentClone := groupSnapshotContent.DeepCopy()
if _, err = ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContentClone); err != nil {
klog.Errorf("error updating volume group snapshot content cache: %v", err)
}
}
}
klog.V(4).Infof("controller initialized")
}

View File

@@ -3,6 +3,7 @@ package utils
import (
"context"
"encoding/json"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
clientset "github.com/kubernetes-csi/external-snapshotter/client/v6/clientset/versioned"
@@ -56,3 +57,23 @@ func PatchVolumeSnapshot(
return newSnapshot, nil
}
// PatchVolumeGroupSnapshotContent patches a volume group snapshot content object
func PatchVolumeGroupSnapshotContent(
existingGroupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent,
patch []PatchOp,
client clientset.Interface,
subresources ...string,
) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
data, err := json.Marshal(patch)
if nil != err {
return existingGroupSnapshotContent, err
}
newGroupSnapshotContent, err := client.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Patch(context.TODO(), existingGroupSnapshotContent.Name, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
if err != nil {
return existingGroupSnapshotContent, err
}
return newGroupSnapshotContent, nil
}

View File

@@ -24,7 +24,6 @@ import (
"strings"
"time"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -33,6 +32,9 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
klog "k8s.io/klog/v2"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
)
var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
@@ -62,6 +64,10 @@ const (
PrefixedVolumeSnapshotNamespaceKey = csiParameterPrefix + "volumesnapshot/namespace" // Prefixed VolumeSnapshot namespace key
PrefixedVolumeSnapshotContentNameKey = csiParameterPrefix + "volumesnapshotcontent/name" // Prefixed VolumeSnapshotContent name key
PrefixedVolumeGroupSnapshotNameKey = csiParameterPrefix + "volumegroupsnapshot/name" // Prefixed VolumeGroupSnapshot name key
PrefixedVolumeGroupSnapshotNamespaceKey = csiParameterPrefix + "volumegroupsnapshot/namespace" // Prefixed VolumeGroupSnapshot namespace key
PrefixedVolumeGroupSnapshotContentNameKey = csiParameterPrefix + "volumegroupsnapshotcontent/name" // Prefixed VolumeGroupSnapshotContent name key
// Name of finalizer on VolumeSnapshotContents that are bound by VolumeSnapshots
VolumeSnapshotContentFinalizer = "snapshot.storage.kubernetes.io/volumesnapshotcontent-bound-protection"
// Name of finalizer on VolumeSnapshot that is being used as a source to create a PVC
@@ -94,6 +100,19 @@ const (
// snapshots.
AnnVolumeSnapshotBeingCreated = "snapshot.storage.kubernetes.io/volumesnapshot-being-created"
// AnnVolumeGroupSnapshotBeingCreated annotation applies to VolumeGroupSnapshotContents.
// If it is set, it indicates that the csi-snapshotter
// sidecar has sent the create group snapshot request to the storage system and
// is waiting for a response of success or failure.
// This annotation will be removed once the driver's CreateGroupSnapshot
// CSI function returns success or a final error (determined by isFinalError()).
// If the create group snapshot request fails with a non-final error such as timeout,
// retry will happen and the annotation will remain.
// This only applies to dynamic provisioning of group snapshots because
// the create group snapshot CSI method will not be called for pre-provisioned
// group snapshots.
AnnVolumeGroupSnapshotBeingCreated = "groupsnapshot.storage.kubernetes.io/volumegroupsnapshot-being-created"
// Annotation for secret name and namespace will be added to the content
// and used at snapshot content deletion time.
AnnDeletionSecretRefName = "snapshot.storage.kubernetes.io/deletion-secret-name"
@@ -475,3 +494,52 @@ func IsSnapshotReady(snapshot *crdv1.VolumeSnapshot) bool {
func IsSnapshotCreated(snapshot *crdv1.VolumeSnapshot) bool {
return snapshot.Status != nil && snapshot.Status.CreationTime != nil
}
func GroupSnapshotKey(vgs *crdv1alpha1.VolumeGroupSnapshot) string {
return fmt.Sprintf("%s/%s", vgs.Namespace, vgs.Name)
}
func GroupSnapshotRefKey(vgsref *v1.ObjectReference) string {
return fmt.Sprintf("%s/%s", vgsref.Namespace, vgsref.Name)
}
func IsGroupSnapshotReady(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) bool {
if groupSnapshot.Status == nil || groupSnapshot.Status.ReadyToUse == nil || *groupSnapshot.Status.ReadyToUse == false {
return false
}
return true
}
func IsBoundVolumeGroupSnapshotContentNameSet(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) bool {
if groupSnapshot.Status == nil || groupSnapshot.Status.BoundVolumeGroupSnapshotContentName == nil || *groupSnapshot.Status.BoundVolumeGroupSnapshotContentName == "" {
return false
}
return true
}
func IsVolumeSnapshotRefListSet(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) bool {
if groupSnapshot.Status == nil || len(groupSnapshot.Status.VolumeSnapshotRefList) == 0 {
return false
}
return true
}
func IsVolumeGroupSnapshotRefSet(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, content *crdv1alpha1.VolumeGroupSnapshotContent) bool {
if content.Spec.VolumeGroupSnapshotRef.Name == groupSnapshot.Name &&
content.Spec.VolumeGroupSnapshotRef.Namespace == groupSnapshot.Namespace &&
content.Spec.VolumeGroupSnapshotRef.UID == groupSnapshot.UID {
return true
}
return false
}
// IsGroupSnapshotCreated indicates that the group snapshot has been cut on a storage system
func IsGroupSnapshotCreated(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) bool {
return groupSnapshot.Status != nil && groupSnapshot.Status.CreationTime != nil
}
// GetDynamicSnapshotContentNameFoGrouprSnapshot returns a unique content name for the
// passed in VolumeGroupSnapshot to dynamically provision a group snapshot.
func GetDynamicSnapshotContentNameForGroupSnapshot(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot) string {
return "groupsnapcontent-" + string(groupSnapshot.UID)
}

File diff suppressed because it is too large Load Diff

View File

@@ -326,7 +326,7 @@ type VolumeGroupSnapshotContentStatus struct {
// for this group snapshot.
// The maximum number of allowed snapshots in the group is 100.
// +optional
VolumeSnapshotContentRefList []core_v1.ObjectReference `json:"volumeSnapshotRefList,omitempty" protobuf:"bytes,5,opt,name=volumeSnapshotRefList"`
VolumeSnapshotContentRefList []core_v1.ObjectReference `json:"volumeSnapshotContentRefList,omitempty" protobuf:"bytes,5,opt,name=volumeSnapshotContentRefList"`
}
// VolumeGroupSnapshotContentSource represents the CSI source of a group snapshot.

2
vendor/modules.txt vendored
View File

@@ -7,7 +7,7 @@ github.com/blang/semver/v4
# github.com/cespare/xxhash/v2 v2.1.2
## explicit; go 1.11
github.com/cespare/xxhash/v2
# github.com/container-storage-interface/spec v1.7.0
# github.com/container-storage-interface/spec v1.8.0
## explicit; go 1.18
github.com/container-storage-interface/spec/lib/go/csi
# github.com/davecgh/go-spew v1.1.1