Added create group volume snapshot functionality

This commit is contained in:
Raunak Pradip Shah
2023-04-01 14:10:29 +05:30
parent 299862c783
commit 871312988b
13 changed files with 1904 additions and 334 deletions

View File

@@ -20,6 +20,7 @@ import (
"context"
"flag"
"fmt"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
"net/http"
"os"
"os/signal"
@@ -223,6 +224,11 @@ func main() {
klog.V(2).Infof("Start NewCSISnapshotSideCarController with snapshotter [%s] kubeconfig [%s] csiTimeout [%+v] csiAddress [%s] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", driverName, *kubeconfig, *csiTimeout, *csiAddress, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength)
snapShotter := snapshotter.NewSnapshotter(csiConn)
var groupSnapshotter group_snapshotter.GroupSnapshotter
if *enableVolumeGroupSnapshots {
groupSnapshotter = group_snapshotter.NewGroupSnapshotter(csiConn)
}
ctrl := controller.NewCSISnapshotSideCarController(
snapClient,
kubeClient,
@@ -230,6 +236,7 @@ func main() {
snapshotContentfactory.Snapshot().V1().VolumeSnapshotContents(),
factory.Snapshot().V1().VolumeSnapshotClasses(),
snapShotter,
groupSnapshotter,
*csiTimeout,
*resyncPeriod,
*snapshotNamePrefix,

2
go.mod
View File

@@ -3,7 +3,7 @@ module github.com/kubernetes-csi/external-snapshotter/v6
go 1.20
require (
github.com/container-storage-interface/spec v1.7.0
github.com/container-storage-interface/spec v1.8.0
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/mock v1.6.0

4
go.sum
View File

@@ -60,8 +60,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/container-storage-interface/spec v1.3.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/container-storage-interface/spec v1.7.0 h1:gW8eyFQUZWWrMWa8p1seJ28gwDoN5CVJ4uAbQ+Hdycw=
github.com/container-storage-interface/spec v1.7.0/go.mod h1:JYuzLqr9VVNoDJl44xp/8fmCOvWPDKzuGTwCoklhuqk=
github.com/container-storage-interface/spec v1.8.0 h1:D0vhF3PLIZwlwZEf2eNbpujGCNwspwTYf2idJRJx4xI=
github.com/container-storage-interface/spec v1.8.0/go.mod h1:ROLik+GhPslwwWRNFF1KasPzroNARibH2rfz1rkg4H0=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -220,7 +220,7 @@ func (ctrl *csiSnapshotCommonController) getClaimsFromVolumeGroupSnapshot(groupS
labelSelector := groupSnapshot.Spec.Source.Selector
// Get PVC that has group snapshot label applied.
pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(groupSnapshot.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()})
pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(groupSnapshot.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
if err != nil {
return nil, fmt.Errorf("failed to list PVCs with label selector %s: %q", labelSelector.String(), err)
}
@@ -647,7 +647,7 @@ func (ctrl *csiSnapshotCommonController) getDynamicallyProvisionedGroupContentFr
return nil, nil
}
// check whether the content represents a dynamically provisioned snapshot
if content.Spec.Source.VolumeGroupSnapshotHandle == nil {
if content.Spec.Source.VolumeGroupSnapshotHandle != nil {
ctrl.updateGroupSnapshotErrorStatusWithEvent(groupSnapshot, true, v1.EventTypeWarning, "GroupSnapshotContentMismatch", "VolumeGroupSnapshotContent "+contentName+" is pre-provisioned while expecting a dynamically provisioned one")
klog.V(4).Infof("sync group snapshot[%s]: group snapshot content %s is pre-provisioned while expecting a dynamically provisioned one", utils.GroupSnapshotKey(groupSnapshot), contentName)
return nil, fmt.Errorf("group snapshot %s expects a dynamically provisioned VolumeGroupSnapshotContent %s but gets a pre-provisioned one", utils.GroupSnapshotKey(groupSnapshot), contentName)

View File

@@ -165,6 +165,9 @@ func NewCSISnapshotCommonController(
ctrl.enableVolumeGroupSnapshots = enableVolumeGroupSnapshots
if enableVolumeGroupSnapshots {
ctrl.groupSnapshotStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ctrl.groupSnapshotContentStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ctrl.groupSnapshotQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotRateLimiter, "snapshot-controller-group-snapshot")
ctrl.groupSnapshotContentQueue = workqueue.NewNamedRateLimitingQueue(groupSnapshotContentRateLimiter, "snapshot-controller-group-content")

View File

@@ -0,0 +1,89 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package group_snapshotter
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes"
csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc"
"google.golang.org/grpc"
klog "k8s.io/klog/v2"
"time"
)
// GroupSnapshotter implements CreateGroupSnapshot/DeleteGroupSnapshot operations against a remote CSI driver.
type GroupSnapshotter interface {
// CreateGroupSnapshot creates a snapshot for a volume
CreateGroupSnapshot(ctx context.Context, groupSnapshotName string, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, snapshots []*csi.Snapshot, timestamp time.Time, readyToUse bool, err error)
// DeleteGroupSnapshot deletes a snapshot from a volume
DeleteGroupSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error)
// GetGroupSnapshotStatus returns if a snapshot is ready to use, creation time, and restore size.
GetGroupSnapshotStatus(ctx context.Context, snapshotID string, snapshotterListCredentials map[string]string) (bool, time.Time, error)
}
type groupSnapshot struct {
conn *grpc.ClientConn
}
func NewGroupSnapshotter(conn *grpc.ClientConn) GroupSnapshotter {
return &groupSnapshot{
conn: conn,
}
}
func (gs *groupSnapshot) CreateGroupSnapshot(ctx context.Context, groupSnapshotName string, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, []*csi.Snapshot, time.Time, bool, error) {
klog.V(5).Infof("CSI CreateSnapshot: %s", groupSnapshotName)
client := csi.NewGroupControllerClient(gs.conn)
driverName, err := csirpc.GetDriverName(ctx, gs.conn)
if err != nil {
return "", "", nil, time.Time{}, false, err
}
req := csi.CreateVolumeGroupSnapshotRequest{
Name: groupSnapshotName,
SourceVolumeIds: volumeIDs,
Secrets: snapshotterCredentials,
Parameters: parameters,
}
rsp, err := client.CreateVolumeGroupSnapshot(ctx, &req)
if err != nil {
return "", "", nil, time.Time{}, false, err
}
klog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%v] snapshots [%v] readyToUse [%v]", groupSnapshotName, driverName, rsp.GroupSnapshot.GroupSnapshotId, rsp.GroupSnapshot.CreationTime, rsp.GroupSnapshot.Snapshots, rsp.GroupSnapshot.ReadyToUse)
creationTime, err := ptypes.Timestamp(rsp.GroupSnapshot.CreationTime)
if err != nil {
return "", "", nil, time.Time{}, false, err
}
return driverName, rsp.GroupSnapshot.GroupSnapshotId, rsp.GroupSnapshot.Snapshots, creationTime, rsp.GroupSnapshot.ReadyToUse, nil
}
func (gs *groupSnapshot) DeleteGroupSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) error {
// TODO: Implement DeleteGroupSnapshot
return nil
}
func (gs *groupSnapshot) GetGroupSnapshotStatus(ctx context.Context, snapshotID string, snapshotterListCredentials map[string]string) (bool, time.Time, error) {
// TODO: Implement GetGroupSnapshotStatus
return true, time.Now(), nil
}

View File

@@ -19,6 +19,9 @@ package sidecar_controller
import (
"context"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
"strings"
"time"
@@ -31,11 +34,14 @@ type Handler interface {
CreateSnapshot(content *crdv1.VolumeSnapshotContent, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error)
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent, snapshotterListCredentials map[string]string) (bool, time.Time, int64, error)
CreateGroupSnapshot(content *crdv1alpha1.VolumeGroupSnapshotContent, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, []*csi.Snapshot, time.Time, bool, error)
GetGroupSnapshotStatus(content *crdv1alpha1.VolumeGroupSnapshotContent, snapshotterListCredentials map[string]string) (bool, time.Time, error)
}
// csiHandler is a handler that calls CSI to create/delete volume snapshot.
type csiHandler struct {
snapshotter snapshotter.Snapshotter
groupSnapshotter group_snapshotter.GroupSnapshotter
timeout time.Duration
snapshotNamePrefix string
snapshotNameUUIDLength int
@@ -44,12 +50,14 @@ type csiHandler struct {
// NewCSIHandler returns a handler which includes the csi connection and Snapshot name details
func NewCSIHandler(
snapshotter snapshotter.Snapshotter,
groupSnapshotter group_snapshotter.GroupSnapshotter,
timeout time.Duration,
snapshotNamePrefix string,
snapshotNameUUIDLength int,
) Handler {
return &csiHandler{
snapshotter: snapshotter,
groupSnapshotter: groupSnapshotter,
timeout: timeout,
snapshotNamePrefix: snapshotNamePrefix,
snapshotNameUUIDLength: snapshotNameUUIDLength,
@@ -131,3 +139,51 @@ func makeSnapshotName(prefix, snapshotUID string, snapshotNameUUIDLength int) (s
}
return fmt.Sprintf("%s-%s", prefix, strings.Replace(snapshotUID, "-", "", -1)[0:snapshotNameUUIDLength]), nil
}
func (handler *csiHandler) CreateGroupSnapshot(content *crdv1alpha1.VolumeGroupSnapshotContent, volumeIDs []string, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, []*csi.Snapshot, time.Time, bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
if content.Spec.VolumeGroupSnapshotRef.UID == "" {
return "", "", nil, time.Time{}, false, fmt.Errorf("cannot create group snapshot. Group snapshot content %s not bound to a group snapshot", content.Name)
}
if len(volumeIDs) == 0 {
return "", "", nil, time.Time{}, false, fmt.Errorf("cannot create group snapshot. PVCs to be snapshotted not found in group snapshot content %s", content.Name)
}
snapshotName, err := makeGroupSnapshotName(handler.snapshotNamePrefix, string(content.Spec.VolumeGroupSnapshotRef.UID))
if err != nil {
return "", "", nil, time.Time{}, false, err
}
return handler.groupSnapshotter.CreateGroupSnapshot(ctx, snapshotName, volumeIDs, parameters, snapshotterCredentials)
}
func (handler *csiHandler) GetGroupSnapshotStatus(content *crdv1alpha1.VolumeGroupSnapshotContent, snapshotterListCredentials map[string]string) (bool, time.Time, error) {
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()
var snapshotHandle string
var err error
if content.Status != nil && content.Status.VolumeGroupSnapshotHandle != nil {
snapshotHandle = *content.Status.VolumeGroupSnapshotHandle
} else if content.Spec.Source.VolumeGroupSnapshotHandle != nil {
snapshotHandle = *content.Spec.Source.VolumeGroupSnapshotHandle
} else {
return false, time.Time{}, fmt.Errorf("failed to list snapshot for content %s: snapshotHandle is missing", content.Name)
}
csiSnapshotStatus, timestamp, err := handler.groupSnapshotter.GetGroupSnapshotStatus(ctx, snapshotHandle, snapshotterListCredentials)
if err != nil {
return false, time.Time{}, fmt.Errorf("failed to list snapshot for content %s: %q", content.Name, err)
}
return csiSnapshotStatus, timestamp, nil
}
func makeGroupSnapshotName(prefix, groupSnapshotUID string) (string, error) {
if len(groupSnapshotUID) == 0 {
return "", fmt.Errorf("group snapshot object is missing UID")
}
return fmt.Sprintf("%s-%s", prefix, strings.Replace(groupSnapshotUID, "-", "", -1)), nil
}

View File

@@ -566,6 +566,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
informerFactory.Snapshot().V1().VolumeSnapshotContents(),
informerFactory.Snapshot().V1().VolumeSnapshotClasses(),
fakeSnapshot,
nil, // TODO: Replace with fake group snapshotter
5*time.Millisecond,
60*time.Second,
"snapshot",

View File

@@ -17,11 +17,19 @@ limitations under the License.
package sidecar_controller
import (
"context"
"crypto/sha256"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
klog "k8s.io/klog/v2"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/utils"
)
@@ -159,12 +167,477 @@ func (ctrl *csiSnapshotSideCarController) syncGroupSnapshotContent(content *crdv
TODO: Check if the group snapshot content should be deleted
*/
/*
TODO: Check if a new group snapshot should be created by calling CreateGroupSnapshot
*/
if len(content.Spec.Source.PersistentVolumeNames) != 0 && content.Status == nil {
klog.V(5).Infof("syncContent: Call CreateGroupSnapshot for group snapshot content %s", content.Name)
return ctrl.createGroupSnapshot(content)
}
// Skip checkandUpdateGroupSnapshotContentStatus() if ReadyToUse is already
// true. We don't want to keep calling CreateGroupSnapshot CSI methods over
// and over again for performance reasons.
var err error
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
// Try to remove AnnVolumeGroupSnapshotBeingCreated if it is not removed yet for some reason
_, err = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(content)
return err
}
return ctrl.checkandUpdateGroupSnapshotContentStatus(content)
}
// createGroupSnapshot starts new asynchronous operation to create group snapshot
func (ctrl *csiSnapshotSideCarController) createGroupSnapshot(content *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("createGroupSnapshot for group snapshot content [%s]: started", content.Name)
contentObj, err := ctrl.createGroupSnapshotWrapper(content)
if err != nil {
ctrl.updateGroupSnapshotContentErrorStatusWithEvent(contentObj, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create group snapshot: %v", err))
klog.Errorf("createSnapshot for content [%s]: error occurred in createSnapshotWrapper: %v", content.Name, err)
return err
}
_, updateErr := ctrl.storeGroupSnapshotContentUpdate(contentObj)
if updateErr != nil {
// We will get a "group snapshot update" event soon, this is not a big error
klog.V(4).Infof("createSnapshot for content [%s]: cannot update internal content cache: %v", content.Name, updateErr)
}
/*
TODO: Check and update group snapshot content status
*/
return nil
}
// This is a wrapper function for the group snapshot creation process.
func (ctrl *csiSnapshotSideCarController) createGroupSnapshotWrapper(content *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.Infof("createGroupSnapshotWrapper: Creating group snapshot for group snapshot content %s through the plugin ...", content.Name)
class, snapshotterCredentials, err := ctrl.getCSIGroupSnapshotInput(content)
if err != nil {
return content, fmt.Errorf("failed to get input parameters to create group snapshot for content %s: %q", content.Name, err)
}
// NOTE(xyang): handle create timeout
// Add an annotation to indicate the group snapshot creation request has been
// sent to the storage system and the controller is waiting for a response.
// The annotation will be removed after the storage system has responded with
// success or permanent failure. If the request times out, annotation will
// remain on the content to avoid potential leaking of a group snapshot resource on
// the storage system.
content, err = ctrl.setAnnVolumeGroupSnapshotBeingCreated(content)
if err != nil {
return content, fmt.Errorf("failed to add VolumeGroupSnapshotBeingCreated annotation on the content %s: %q", content.Name, err)
}
parameters, err := utils.RemovePrefixedParameters(class.Parameters)
if err != nil {
return content, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
}
if ctrl.extraCreateMetadata {
parameters[utils.PrefixedVolumeGroupSnapshotNameKey] = content.Spec.VolumeGroupSnapshotRef.Name
parameters[utils.PrefixedVolumeGroupSnapshotNamespaceKey] = content.Spec.VolumeGroupSnapshotRef.Namespace
parameters[utils.PrefixedVolumeGroupSnapshotContentNameKey] = content.Name
}
volumeIDs, err := ctrl.getGroupSnapshotVolumeIDs(content)
driverName, groupSnapshotID, snapshots, creationTime, readyToUse, err := ctrl.handler.CreateGroupSnapshot(content, volumeIDs, parameters, snapshotterCredentials)
if err != nil {
// NOTE(xyang): handle create timeout
// If it is a final error, remove annotation to indicate
// storage system has responded with an error
klog.Infof("createSnapshotWrapper: CreateSnapshot for content %s returned error: %v", content.Name, err)
if isCSIFinalError(err) {
var removeAnnotationErr error
if content, removeAnnotationErr = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(content); removeAnnotationErr != nil {
return content, fmt.Errorf("failed to remove VolumeGroupSnapshotBeingCreated annotation from the content %s: %s", content.Name, removeAnnotationErr)
}
}
return content, fmt.Errorf("failed to take group snapshot of the volumes %s: %q", content.Spec.Source.PersistentVolumeNames, err)
}
klog.V(5).Infof("Created group snapshot: driver %s, groupSnapshotId %s, creationTime %v, readyToUse %t", driverName, groupSnapshotID, creationTime, readyToUse)
if creationTime.IsZero() {
creationTime = time.Now()
}
// Create individual snapshots and snapshot contents
for _, snapshot := range snapshots {
volumeSnapshotContentName := GetSnapshotContentNameForVolumeGroupSnapshotContent(content)
volumeSnapshotName := GetSnapshotNameForVolumeGroupSnapshotContent(content)
volumeSnapshotNamespace := content.Spec.VolumeGroupSnapshotRef.Namespace
volumeSnapshotContent := &crdv1.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: volumeSnapshotContentName,
},
Spec: crdv1.VolumeSnapshotContentSpec{
VolumeSnapshotRef: v1.ObjectReference{
Kind: "VolumeSnapshots",
Name: volumeSnapshotName,
Namespace: volumeSnapshotNamespace,
},
DeletionPolicy: content.Spec.DeletionPolicy,
Driver: content.Spec.Driver,
Source: crdv1.VolumeSnapshotContentSource{
SnapshotHandle: &snapshot.SnapshotId,
},
SourceVolumeMode: nil,
},
}
volumeSnapshot := &crdv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: volumeSnapshotName,
Namespace: volumeSnapshotNamespace,
},
Spec: crdv1.VolumeSnapshotSpec{
Source: crdv1.VolumeSnapshotSource{
VolumeSnapshotContentName: &volumeSnapshotContentName,
},
},
}
_, err = ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(context.TODO(), volumeSnapshotContent, metav1.CreateOptions{})
if err != nil {
return content, err
}
_, err = ctrl.clientset.SnapshotV1().VolumeSnapshots(volumeSnapshotNamespace).Create(context.TODO(), volumeSnapshot, metav1.CreateOptions{})
if err != nil {
return content, err
}
}
newContent, err := ctrl.updateGroupSnapshotContentStatus(content, groupSnapshotID, readyToUse, creationTime.UnixNano())
if err != nil {
klog.Errorf("error updating status for volume group snapshot content %s: %v.", content.Name, err)
return content, fmt.Errorf("error updating status for volume group snapshot content %s: %v", content.Name, err)
}
content = newContent
// NOTE(xyang): handle create timeout
// Remove annotation to indicate storage system has successfully
// cut the group snapshot
content, err = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(content)
if err != nil {
return content, fmt.Errorf("failed to remove VolumeGroupSnapshotBeingCreated annotation on the content %s: %q", content.Name, err)
}
return content, nil
}
func (ctrl *csiSnapshotSideCarController) getCSIGroupSnapshotInput(content *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotClass, map[string]string, error) {
className := content.Spec.VolumeGroupSnapshotClassName
klog.V(5).Infof("getCSIGroupSnapshotInput for group snapshot content [%s]", content.Name)
var class *crdv1alpha1.VolumeGroupSnapshotClass
var err error
if className != nil {
class, err = ctrl.getGroupSnapshotClass(*className)
if err != nil {
klog.Errorf("getCSISnapshotInput failed to getClassFromVolumeGroupSnapshot %s", err)
return nil, nil, err
}
} else {
// If dynamic provisioning, return failure if no group snapshot class
if len(content.Spec.Source.PersistentVolumeNames) != 0 {
klog.Errorf("failed to getCSISnapshotInput %s without a group snapshot class", content.Name)
return nil, nil, fmt.Errorf("failed to take group snapshot %s without a group snapshot class", content.Name)
}
// For pre-provisioned group snapshot, group snapshot class is not required
klog.V(5).Infof("getCSISnapshotInput for content [%s]: no VolumeGroupSnapshotClassName provided for pre-provisioned group snapshot", content.Name)
}
// TODO: Resolve snapshotting secret credentials.
return class, nil, nil
}
// getGroupSnapshotClass is a helper function to get group snapshot class from the class name.
func (ctrl *csiSnapshotSideCarController) getGroupSnapshotClass(className string) (*crdv1alpha1.VolumeGroupSnapshotClass, error) {
klog.V(5).Infof("getGroupSnapshotClass: VolumeGroupSnapshotClassName [%s]", className)
class, err := ctrl.groupSnapshotClassLister.Get(className)
if err != nil {
klog.Errorf("failed to retrieve group snapshot class %s from the informer: %q", className, err)
return nil, err
}
return class, nil
}
// setAnnVolumeGroupSnapshotBeingCreated sets VolumeGroupSnapshotBeingCreated annotation
// on VolumeGroupSnapshotContent
// If set, it indicates group snapshot is being created
func (ctrl *csiSnapshotSideCarController) setAnnVolumeGroupSnapshotBeingCreated(content *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
if metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingCreated) {
// the annotation already exists, return directly
return content, nil
}
// Set AnnVolumeGroupSnapshotBeingCreated
// Combine existing annotations with the new annotations.
// If there are no existing annotations, we create a new map.
klog.V(5).Infof("setAnnVolumeGroupSnapshotBeingCreated: set annotation [%s:yes] on content [%s].", utils.AnnVolumeGroupSnapshotBeingCreated, content.Name)
patchedAnnotations := make(map[string]string)
for k, v := range content.GetAnnotations() {
patchedAnnotations[k] = v
}
patchedAnnotations[utils.AnnVolumeGroupSnapshotBeingCreated] = "yes"
var patches []utils.PatchOp
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/metadata/annotations",
Value: patchedAnnotations,
})
patchedContent, err := utils.PatchVolumeGroupSnapshotContent(content, patches, ctrl.clientset)
if err != nil {
return content, newControllerUpdateError(content.Name, err.Error())
}
// update content if update is successful
content = patchedContent
_, err = ctrl.storeContentUpdate(content)
if err != nil {
klog.V(4).Infof("setAnnVolumeGroupSnapshotBeingCreated for content [%s]: cannot update internal cache %v", content.Name, err)
}
klog.V(5).Infof("setAnnVolumeGroupSnapshotBeingCreated: volume group snapshot content %+v", content)
return content, nil
}
func (ctrl *csiSnapshotSideCarController) getGroupSnapshotVolumeIDs(content *crdv1alpha1.VolumeGroupSnapshotContent) ([]string, error) {
// TODO: Get add PV lister
var volumeIDs []string
for _, pvName := range content.Spec.Source.PersistentVolumeNames {
pv, err := ctrl.client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
if err != nil {
return nil, err
}
if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
volumeIDs = append(volumeIDs, pv.Spec.CSI.VolumeHandle)
}
}
return volumeIDs, nil
}
// removeAnnVolumeGroupSnapshotBeingCreated removes the VolumeGroupSnapshotBeingCreated
// annotation from a content if there exists one.
func (ctrl csiSnapshotSideCarController) removeAnnVolumeGroupSnapshotBeingCreated(content *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
if !metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingCreated) {
// the annotation does not exist, return directly
return content, nil
}
contentClone := content.DeepCopy()
delete(contentClone.ObjectMeta.Annotations, utils.AnnVolumeGroupSnapshotBeingCreated)
updatedContent, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Update(context.TODO(), contentClone, metav1.UpdateOptions{})
if err != nil {
return content, newControllerUpdateError(content.Name, err.Error())
}
klog.V(5).Infof("Removed VolumeGroupSnapshotBeingCreated annotation from volume group snapshot content %s", content.Name)
_, err = ctrl.storeContentUpdate(updatedContent)
if err != nil {
klog.Errorf("failed to update content store %v", err)
}
return updatedContent, nil
}
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentStatus(
content *crdv1alpha1.VolumeGroupSnapshotContent,
groupSnapshotHandle string,
readyToUse bool,
createdAt int64) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.V(5).Infof("updateSnapshotContentStatus: updating VolumeGroupSnapshotContent [%s], groupSnapshotHandle %s, readyToUse %v, createdAt %v", content.Name, groupSnapshotHandle, readyToUse, createdAt)
contentObj, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Get(context.TODO(), content.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get group snapshot content %s from api server: %v", content.Name, err)
}
var newStatus *crdv1alpha1.VolumeGroupSnapshotContentStatus
updated := false
if contentObj.Status == nil {
newStatus = &crdv1alpha1.VolumeGroupSnapshotContentStatus{
VolumeGroupSnapshotHandle: &groupSnapshotHandle,
ReadyToUse: &readyToUse,
CreationTime: &createdAt,
}
updated = true
} else {
newStatus = contentObj.Status.DeepCopy()
if newStatus.VolumeGroupSnapshotHandle == nil {
newStatus.VolumeGroupSnapshotHandle = &groupSnapshotHandle
updated = true
}
if newStatus.ReadyToUse == nil || *newStatus.ReadyToUse != readyToUse {
newStatus.ReadyToUse = &readyToUse
updated = true
if readyToUse && newStatus.Error != nil {
newStatus.Error = nil
}
}
if newStatus.CreationTime == nil {
newStatus.CreationTime = &createdAt
updated = true
}
}
if updated {
contentClone := contentObj.DeepCopy()
contentClone.Status = newStatus
newContent, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().UpdateStatus(context.TODO(), contentClone, metav1.UpdateOptions{})
if err != nil {
return contentObj, newControllerUpdateError(content.Name, err.Error())
}
return newContent, nil
}
return contentObj, nil
}
// updateContentStatusWithEvent saves new content.Status to API server and emits
// given event on the content. It saves the status and emits the event only when
// the status has actually changed from the version saved in API server.
// Parameters:
//
// * content - content to update
// * eventtype, reason, message - event to send, see EventRecorder.Event()
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentErrorStatusWithEvent(content *crdv1alpha1.VolumeGroupSnapshotContent, eventtype, reason, message string) error {
klog.V(5).Infof("updateContentStatusWithEvent[%s]", content.Name)
if content.Status != nil && content.Status.Error != nil && *content.Status.Error.Message == message {
klog.V(4).Infof("updateContentStatusWithEvent[%s]: the same error %v is already set", content.Name, content.Status.Error)
return nil
}
var patches []utils.PatchOp
ready := false
contentStatusError := &crdv1.VolumeSnapshotError{
Time: &metav1.Time{
Time: time.Now(),
},
Message: &message,
}
if content.Status == nil {
// Initialize status if nil
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status",
Value: &crdv1alpha1.VolumeGroupSnapshotContentStatus{
ReadyToUse: &ready,
Error: contentStatusError,
},
})
} else {
// Patch status if non-nil
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status/error",
Value: contentStatusError,
})
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status/readyToUse",
Value: &ready,
})
}
newContent, err := utils.PatchVolumeGroupSnapshotContent(content, patches, ctrl.clientset, "status")
// Emit the event even if the status update fails so that user can see the error
ctrl.eventRecorder.Event(newContent, eventtype, reason, message)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status failed %v", content.Name, err)
return err
}
_, err = ctrl.storeGroupSnapshotContentUpdate(newContent)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status: cannot update internal cache %v", content.Name, err)
return err
}
return nil
}
// GetSnapshotNameForVolumeGroupSnapshotContent returns a unique snapshot name for a VolumeGroupSnapshotContent.
func GetSnapshotNameForVolumeGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) string {
return fmt.Sprintf("groupsnapshot-%x-%d", sha256.Sum256([]byte(content.UID)), time.Duration(time.Now().UnixNano())/time.Millisecond)
}
// GetSnapshotContentNameForVolumeGroupSnapshotContent returns a unique content name for the
// passed in VolumeGroupSnapshotContent.
func GetSnapshotContentNameForVolumeGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) string {
return fmt.Sprintf("groupsnapcontent-%x-%d", sha256.Sum256([]byte(content.UID)), time.Duration(time.Now().UnixNano())/time.Millisecond)
}
func (ctrl *csiSnapshotSideCarController) checkandUpdateGroupSnapshotContentStatus(content *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatus[%s] started", content.Name)
contentObj, err := ctrl.checkandUpdateGroupSnapshotContentStatusOperation(content)
if err != nil {
ctrl.updateGroupSnapshotContentErrorStatusWithEvent(contentObj, v1.EventTypeWarning, "GroupSnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update group snapshot content: %v", err))
klog.Errorf("checkandUpdateGroupSnapshotContentStatus [%s]: error occurred %v", content.Name, err)
return err
}
_, updateErr := ctrl.storeGroupSnapshotContentUpdate(contentObj)
if updateErr != nil {
// We will get a "group snapshot update" event soon, this is not a big error
klog.V(4).Infof("checkandUpdateGroupSnapshotContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr)
}
return nil
}
func (ctrl *csiSnapshotSideCarController) checkandUpdateGroupSnapshotContentStatusOperation(content *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
var err error
var creationTime time.Time
readyToUse := false
var driverName string
var groupSnapshotID string
var snapshotterListCredentials map[string]string
if content.Spec.Source.VolumeGroupSnapshotHandle != nil {
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatusOperation: call GetSnapshotStatus for group snapshot which is pre-bound to content [%s]", content.Name)
if content.Spec.VolumeGroupSnapshotClassName != nil {
class, err := ctrl.getGroupSnapshotClass(*content.Spec.VolumeGroupSnapshotClassName)
if err != nil {
klog.Errorf("Failed to get group snapshot class %s for group snapshot content %s: %v", *content.Spec.VolumeGroupSnapshotClassName, content.Name, err)
return content, fmt.Errorf("failed to get group snapshot class %s for group snapshot content %s: %v", *content.Spec.VolumeGroupSnapshotClassName, content.Name, err)
}
snapshotterListSecretRef, err := utils.GetSecretReference(utils.SnapshotterListSecretParams, class.Parameters, content.GetObjectMeta().GetName(), nil)
if err != nil {
klog.Errorf("Failed to get secret reference for group snapshot content %s: %v", content.Name, err)
return content, fmt.Errorf("failed to get secret reference for group snapshot content %s: %v", content.Name, err)
}
snapshotterListCredentials, err = utils.GetCredentials(ctrl.client, snapshotterListSecretRef)
if err != nil {
// Continue with deletion, as the secret may have already been deleted.
klog.Errorf("Failed to get credentials for group snapshot content %s: %v", content.Name, err)
return content, fmt.Errorf("failed to get credentials for group snapshot content %s: %v", content.Name, err)
}
}
readyToUse, creationTime, err = ctrl.handler.GetGroupSnapshotStatus(content, snapshotterListCredentials)
if err != nil {
klog.Errorf("checkandUpdateGroupSnapshotContentStatusOperation: failed to call get group snapshot status to check whether group snapshot is ready to use %q", err)
return content, err
}
driverName = content.Spec.Driver
groupSnapshotID = *content.Spec.Source.VolumeGroupSnapshotHandle
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatusOperation: driver %s, groupSnapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, groupSnapshotID, creationTime, readyToUse)
if creationTime.IsZero() {
creationTime = time.Now()
}
updatedContent, err := ctrl.updateGroupSnapshotContentStatus(content, groupSnapshotID, readyToUse, creationTime.UnixNano())
if err != nil {
return content, err
}
return updatedContent, nil
}
return ctrl.createGroupSnapshotWrapper(content)
}

View File

@@ -18,6 +18,7 @@ package sidecar_controller
import (
"fmt"
"github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter"
"time"
v1 "k8s.io/api/core/v1"
@@ -79,6 +80,7 @@ func NewCSISnapshotSideCarController(
volumeSnapshotContentInformer snapshotinformers.VolumeSnapshotContentInformer,
volumeSnapshotClassInformer snapshotinformers.VolumeSnapshotClassInformer,
snapshotter snapshotter.Snapshotter,
groupSnapshotter group_snapshotter.GroupSnapshotter,
timeout time.Duration,
resyncPeriod time.Duration,
snapshotNamePrefix string,
@@ -101,7 +103,7 @@ func NewCSISnapshotSideCarController(
client: client,
driverName: driverName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
resyncPeriod: resyncPeriod,
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-snapshotter-content"),
@@ -159,7 +161,7 @@ func NewCSISnapshotSideCarController(
)
ctrl.groupSnapshotContentLister = volumeGroupSnapshotContentInformer.Lister()
ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotContentInformer.Informer().HasSynced
ctrl.groupSnapshotContentListerSynced = volumeGroupSnapshotContentInformer.Informer().HasSynced
ctrl.groupSnapshotClassLister = volumeGroupSnapshotClassInformer.Lister()
ctrl.groupSnapshotClassListerSynced = volumeGroupSnapshotClassInformer.Informer().HasSynced

View File

@@ -64,6 +64,10 @@ const (
PrefixedVolumeSnapshotNamespaceKey = csiParameterPrefix + "volumesnapshot/namespace" // Prefixed VolumeSnapshot namespace key
PrefixedVolumeSnapshotContentNameKey = csiParameterPrefix + "volumesnapshotcontent/name" // Prefixed VolumeSnapshotContent name key
PrefixedVolumeGroupSnapshotNameKey = csiParameterPrefix + "volumegroupsnapshot/name" // Prefixed VolumeGroupSnapshot name key
PrefixedVolumeGroupSnapshotNamespaceKey = csiParameterPrefix + "volumegroupsnapshot/namespace" // Prefixed VolumeGroupSnapshot namespace key
PrefixedVolumeGroupSnapshotContentNameKey = csiParameterPrefix + "volumegroupsnapshotcontent/name" // Prefixed VolumeGroupSnapshotContent name key
// Name of finalizer on VolumeSnapshotContents that are bound by VolumeSnapshots
VolumeSnapshotContentFinalizer = "snapshot.storage.kubernetes.io/volumesnapshotcontent-bound-protection"
// Name of finalizer on VolumeSnapshot that is being used as a source to create a PVC
@@ -96,6 +100,19 @@ const (
// snapshots.
AnnVolumeSnapshotBeingCreated = "snapshot.storage.kubernetes.io/volumesnapshot-being-created"
// AnnVolumeGroupSnapshotBeingCreated annotation applies to VolumeGroupSnapshotContents.
// If it is set, it indicates that the csi-snapshotter
// sidecar has sent the create group snapshot request to the storage system and
// is waiting for a response of success or failure.
// This annotation will be removed once the driver's CreateGroupSnapshot
// CSI function returns success or a final error (determined by isFinalError()).
// If the create group snapshot request fails with a non-final error such as timeout,
// retry will happen and the annotation will remain.
// This only applies to dynamic provisioning of group snapshots because
// the create group snapshot CSI method will not be called for pre-provisioned
// group snapshots.
AnnVolumeGroupSnapshotBeingCreated = "snapshot.storage.kubernetes.io/volumegroupsnapshot-being-created"
// Annotation for secret name and namespace will be added to the content
// and used at snapshot content deletion time.
AnnDeletionSecretRefName = "snapshot.storage.kubernetes.io/deletion-secret-name"

File diff suppressed because it is too large Load Diff

2
vendor/modules.txt vendored
View File

@@ -7,7 +7,7 @@ github.com/blang/semver/v4
# github.com/cespare/xxhash/v2 v2.1.2
## explicit; go 1.11
github.com/cespare/xxhash/v2
# github.com/container-storage-interface/spec v1.7.0
# github.com/container-storage-interface/spec v1.8.0
## explicit; go 1.18
github.com/container-storage-interface/spec/lib/go/csi
# github.com/davecgh/go-spew v1.1.1