Update snapshotter to use CSI spec 1.0. This breaks the snapshot handling of error and uploading status
This commit is contained in:
@@ -25,6 +25,8 @@ import (
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/golang/glog"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"k8s.io/api/core/v1"
|
||||
@@ -46,13 +48,13 @@ type CSIConnection interface {
|
||||
SupportsControllerListSnapshots(ctx context.Context) (bool, error)
|
||||
|
||||
// CreateSnapshot creates a snapshot for a volume
|
||||
CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp int64, size int64, status *csi.SnapshotStatus, err error)
|
||||
CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp int64, size int64, readyToUse bool, err error)
|
||||
|
||||
// DeleteSnapshot deletes a snapshot from a volume
|
||||
DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error)
|
||||
|
||||
// GetSnapshotStatus returns a snapshot's status, creation time, and restore size.
|
||||
GetSnapshotStatus(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, int64, int64, error)
|
||||
// GetSnapshotStatus returns if a snapshot is ready to use, creation time, and restore size.
|
||||
GetSnapshotStatus(ctx context.Context, snapshotID string) (bool, int64, int64, error)
|
||||
|
||||
// Probe checks that the CSI driver is ready to process requests
|
||||
Probe(ctx context.Context) error
|
||||
@@ -188,41 +190,45 @@ func (c *csiConnection) SupportsControllerListSnapshots(ctx context.Context) (bo
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *csiConnection) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, *csi.SnapshotStatus, error) {
|
||||
func (c *csiConnection) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, bool, error) {
|
||||
glog.V(5).Infof("CSI CreateSnapshot: %s", snapshotName)
|
||||
if volume.Spec.CSI == nil {
|
||||
return "", "", 0, 0, nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
|
||||
return "", "", 0, 0, false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
|
||||
}
|
||||
|
||||
client := csi.NewControllerClient(c.conn)
|
||||
|
||||
driverName, err := c.GetDriverName(ctx)
|
||||
if err != nil {
|
||||
return "", "", 0, 0, nil, err
|
||||
return "", "", 0, 0, false, err
|
||||
}
|
||||
|
||||
req := csi.CreateSnapshotRequest{
|
||||
SourceVolumeId: volume.Spec.CSI.VolumeHandle,
|
||||
Name: snapshotName,
|
||||
Parameters: parameters,
|
||||
CreateSnapshotSecrets: snapshotterCredentials,
|
||||
SourceVolumeId: volume.Spec.CSI.VolumeHandle,
|
||||
Name: snapshotName,
|
||||
Parameters: parameters,
|
||||
Secrets: snapshotterCredentials,
|
||||
}
|
||||
|
||||
rsp, err := client.CreateSnapshot(ctx, &req)
|
||||
if err != nil {
|
||||
return "", "", 0, 0, nil, err
|
||||
return "", "", 0, 0, false, err
|
||||
}
|
||||
|
||||
glog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] status [%s]", snapshotName, driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, rsp.Snapshot.SizeBytes, *rsp.Snapshot.Status)
|
||||
return driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, rsp.Snapshot.SizeBytes, rsp.Snapshot.Status, nil
|
||||
glog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] readyToUse [%v]", snapshotName, driverName, rsp.Snapshot.SnapshotId, rsp.Snapshot.CreationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse)
|
||||
creationTime, err := timestampToUnixTime(rsp.Snapshot.CreationTime)
|
||||
if err != nil {
|
||||
return "", "", 0, 0, false, err
|
||||
}
|
||||
return driverName, rsp.Snapshot.SnapshotId, creationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse, nil
|
||||
}
|
||||
|
||||
func (c *csiConnection) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error) {
|
||||
client := csi.NewControllerClient(c.conn)
|
||||
|
||||
req := csi.DeleteSnapshotRequest{
|
||||
SnapshotId: snapshotID,
|
||||
DeleteSnapshotSecrets: snapshotterCredentials,
|
||||
SnapshotId: snapshotID,
|
||||
Secrets: snapshotterCredentials,
|
||||
}
|
||||
|
||||
if _, err := client.DeleteSnapshot(ctx, &req); err != nil {
|
||||
@@ -232,7 +238,7 @@ func (c *csiConnection) DeleteSnapshot(ctx context.Context, snapshotID string, s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *csiConnection) GetSnapshotStatus(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, int64, int64, error) {
|
||||
func (c *csiConnection) GetSnapshotStatus(ctx context.Context, snapshotID string) (bool, int64, int64, error) {
|
||||
client := csi.NewControllerClient(c.conn)
|
||||
|
||||
req := csi.ListSnapshotsRequest{
|
||||
@@ -241,14 +247,18 @@ func (c *csiConnection) GetSnapshotStatus(ctx context.Context, snapshotID string
|
||||
|
||||
rsp, err := client.ListSnapshots(ctx, &req)
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
return false, 0, 0, err
|
||||
}
|
||||
|
||||
if rsp.Entries == nil || len(rsp.Entries) == 0 {
|
||||
return nil, 0, 0, fmt.Errorf("can not find snapshot for snapshotID %s", snapshotID)
|
||||
return false, 0, 0, fmt.Errorf("can not find snapshot for snapshotID %s", snapshotID)
|
||||
}
|
||||
|
||||
return rsp.Entries[0].Snapshot.Status, rsp.Entries[0].Snapshot.CreatedAt, rsp.Entries[0].Snapshot.SizeBytes, nil
|
||||
creationTime, err := timestampToUnixTime(rsp.Entries[0].Snapshot.CreationTime)
|
||||
if err != nil {
|
||||
return false, 0, 0, err
|
||||
}
|
||||
return rsp.Entries[0].Snapshot.ReadyToUse, creationTime, rsp.Entries[0].Snapshot.SizeBytes, nil
|
||||
}
|
||||
|
||||
func (c *csiConnection) Close() error {
|
||||
@@ -262,3 +272,13 @@ func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
|
||||
glog.V(5).Infof("GRPC error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
func timestampToUnixTime(t *timestamp.Timestamp) (int64, error) {
|
||||
time, err := ptypes.Timestamp(t)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
// TODO: clean this up, we probably don't need this translation layer
|
||||
// and can just use time.Time
|
||||
return time.UnixNano(), nil
|
||||
}
|
||||
|
@@ -21,10 +21,10 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/kubernetes-csi/csi-test/driver"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
@@ -378,7 +378,11 @@ func TestSupportsControllerListSnapshots(t *testing.T) {
|
||||
func TestCreateSnapshot(t *testing.T) {
|
||||
defaultName := "snapshot-test"
|
||||
defaultID := "testid"
|
||||
createTime := time.Now().UnixNano()
|
||||
createTimestamp := ptypes.TimestampNow()
|
||||
createTime, err := ptypes.Timestamp(createTimestamp)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to convert timestamp to time: %v", err)
|
||||
}
|
||||
|
||||
createSecrets := map[string]string{"foo": "bar"}
|
||||
defaultParameter := map[string]string{
|
||||
@@ -401,21 +405,18 @@ func TestCreateSnapshot(t *testing.T) {
|
||||
}
|
||||
|
||||
secretsRequest := &csi.CreateSnapshotRequest{
|
||||
Name: defaultName,
|
||||
SourceVolumeId: csiVolume.Spec.CSI.VolumeHandle,
|
||||
CreateSnapshotSecrets: createSecrets,
|
||||
Name: defaultName,
|
||||
SourceVolumeId: csiVolume.Spec.CSI.VolumeHandle,
|
||||
Secrets: createSecrets,
|
||||
}
|
||||
|
||||
defaultResponse := &csi.CreateSnapshotResponse{
|
||||
Snapshot: &csi.Snapshot{
|
||||
Id: defaultID,
|
||||
SnapshotId: defaultID,
|
||||
SizeBytes: 1000,
|
||||
SourceVolumeId: csiVolume.Spec.CSI.VolumeHandle,
|
||||
CreatedAt: createTime,
|
||||
Status: &csi.SnapshotStatus{
|
||||
Type: csi.SnapshotStatus_READY,
|
||||
Details: "success",
|
||||
},
|
||||
CreationTime: createTimestamp,
|
||||
ReadyToUse: true,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -432,18 +433,15 @@ func TestCreateSnapshot(t *testing.T) {
|
||||
snapshotId string
|
||||
timestamp int64
|
||||
size int64
|
||||
status *csi.SnapshotStatus
|
||||
readyToUse bool
|
||||
}
|
||||
|
||||
result := &snapshotResult{
|
||||
size: 1000,
|
||||
driverName: driverName,
|
||||
snapshotId: defaultID,
|
||||
timestamp: createTime,
|
||||
status: &csi.SnapshotStatus{
|
||||
Type: csi.SnapshotStatus_READY,
|
||||
Details: "success",
|
||||
},
|
||||
timestamp: createTime.UnixNano(),
|
||||
readyToUse: true,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
@@ -537,7 +535,7 @@ func TestCreateSnapshot(t *testing.T) {
|
||||
controllerServer.EXPECT().CreateSnapshot(gomock.Any(), in).Return(out, injectedErr).Times(1)
|
||||
}
|
||||
|
||||
driverName, snapshotId, timestamp, size, status, err := csiConn.CreateSnapshot(context.Background(), test.snapshotName, test.volume, test.parameters, test.secrets)
|
||||
driverName, snapshotId, timestamp, size, readyToUse, err := csiConn.CreateSnapshot(context.Background(), test.snapshotName, test.volume, test.parameters, test.secrets)
|
||||
if test.expectError && err == nil {
|
||||
t.Errorf("test %q: Expected error, got none", test.name)
|
||||
}
|
||||
@@ -561,8 +559,8 @@ func TestCreateSnapshot(t *testing.T) {
|
||||
t.Errorf("test %q: expected size: %v, got: %v", test.name, test.expectResult.size, size)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(status, test.expectResult.status) {
|
||||
t.Errorf("test %q: expected status: %v, got: %v", test.name, test.expectResult.status, status)
|
||||
if !reflect.DeepEqual(readyToUse, test.expectResult.readyToUse) {
|
||||
t.Errorf("test %q: expected readyToUse: %v, got: %v", test.name, test.expectResult.readyToUse, readyToUse)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -577,8 +575,8 @@ func TestDeleteSnapshot(t *testing.T) {
|
||||
}
|
||||
|
||||
secretsRequest := &csi.DeleteSnapshotRequest{
|
||||
SnapshotId: defaultID,
|
||||
DeleteSnapshotSecrets: secrets,
|
||||
SnapshotId: defaultID,
|
||||
Secrets: secrets,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
@@ -657,8 +655,12 @@ func TestDeleteSnapshot(t *testing.T) {
|
||||
|
||||
func TestGetSnapshotStatus(t *testing.T) {
|
||||
defaultID := "testid"
|
||||
createdAt := time.Now().UnixNano()
|
||||
size := int64(1000)
|
||||
createTimestamp := ptypes.TimestampNow()
|
||||
createTime, err := ptypes.Timestamp(createTimestamp)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to convert timestamp to time: %v", err)
|
||||
}
|
||||
|
||||
defaultRequest := &csi.ListSnapshotsRequest{
|
||||
SnapshotId: defaultID,
|
||||
@@ -668,14 +670,11 @@ func TestGetSnapshotStatus(t *testing.T) {
|
||||
Entries: []*csi.ListSnapshotsResponse_Entry{
|
||||
{
|
||||
Snapshot: &csi.Snapshot{
|
||||
Id: defaultID,
|
||||
SnapshotId: defaultID,
|
||||
SizeBytes: size,
|
||||
SourceVolumeId: "volumeid",
|
||||
CreatedAt: createdAt,
|
||||
Status: &csi.SnapshotStatus{
|
||||
Type: csi.SnapshotStatus_READY,
|
||||
Details: "success",
|
||||
},
|
||||
CreationTime: createTimestamp,
|
||||
ReadyToUse: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -688,21 +687,18 @@ func TestGetSnapshotStatus(t *testing.T) {
|
||||
output *csi.ListSnapshotsResponse
|
||||
injectError codes.Code
|
||||
expectError bool
|
||||
expectStatus *csi.SnapshotStatus
|
||||
expectReady bool
|
||||
expectCreateAt int64
|
||||
expectSize int64
|
||||
}{
|
||||
{
|
||||
name: "success",
|
||||
snapshotID: defaultID,
|
||||
input: defaultRequest,
|
||||
output: defaultResponse,
|
||||
expectError: false,
|
||||
expectStatus: &csi.SnapshotStatus{
|
||||
Type: csi.SnapshotStatus_READY,
|
||||
Details: "success",
|
||||
},
|
||||
expectCreateAt: createdAt,
|
||||
name: "success",
|
||||
snapshotID: defaultID,
|
||||
input: defaultRequest,
|
||||
output: defaultResponse,
|
||||
expectError: false,
|
||||
expectReady: true,
|
||||
expectCreateAt: createTime.UnixNano(),
|
||||
expectSize: size,
|
||||
},
|
||||
{
|
||||
@@ -744,15 +740,15 @@ func TestGetSnapshotStatus(t *testing.T) {
|
||||
controllerServer.EXPECT().ListSnapshots(gomock.Any(), in).Return(out, injectedErr).Times(1)
|
||||
}
|
||||
|
||||
status, createTime, size, err := csiConn.GetSnapshotStatus(context.Background(), test.snapshotID)
|
||||
ready, createTime, size, err := csiConn.GetSnapshotStatus(context.Background(), test.snapshotID)
|
||||
if test.expectError && err == nil {
|
||||
t.Errorf("test %q: Expected error, got none", test.name)
|
||||
}
|
||||
if !test.expectError && err != nil {
|
||||
t.Errorf("test %q: got error: %v", test.name, err)
|
||||
}
|
||||
if test.expectStatus != nil && !reflect.DeepEqual(test.expectStatus, status) {
|
||||
t.Errorf("test %q: expected status: %v, got: %v", test.name, test.expectStatus, status)
|
||||
if test.expectReady != ready {
|
||||
t.Errorf("test %q: expected status: %v, got: %v", test.name, test.expectReady, ready)
|
||||
}
|
||||
if test.expectCreateAt != createTime {
|
||||
t.Errorf("test %q: expected createTime: %v, got: %v", test.name, test.expectCreateAt, createTime)
|
||||
|
@@ -22,7 +22,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
|
||||
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
|
||||
"k8s.io/api/core/v1"
|
||||
@@ -30,9 +29,9 @@ import (
|
||||
|
||||
// Handler is responsible for handling VolumeSnapshot events from informer.
|
||||
type Handler interface {
|
||||
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, *csi.SnapshotStatus, error)
|
||||
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, bool, error)
|
||||
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
|
||||
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (*csi.SnapshotStatus, int64, int64, error)
|
||||
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (bool, int64, int64, error)
|
||||
}
|
||||
|
||||
// csiHandler is a handler that calls CSI to create/delete volume snapshot.
|
||||
@@ -57,14 +56,14 @@ func NewCSIHandler(
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, *csi.SnapshotStatus, error) {
|
||||
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, bool, error) {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
|
||||
defer cancel()
|
||||
|
||||
snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength)
|
||||
if err != nil {
|
||||
return "", "", 0, 0, nil, err
|
||||
return "", "", 0, 0, false, err
|
||||
}
|
||||
return handler.csiConnection.CreateSnapshot(ctx, snapshotName, volume, parameters, snapshotterCredentials)
|
||||
}
|
||||
@@ -84,16 +83,16 @@ func (handler *csiHandler) DeleteSnapshot(content *crdv1.VolumeSnapshotContent,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (handler *csiHandler) GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (*csi.SnapshotStatus, int64, int64, error) {
|
||||
func (handler *csiHandler) GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (bool, int64, int64, error) {
|
||||
if content.Spec.CSI == nil {
|
||||
return nil, 0, 0, fmt.Errorf("CSISnapshot not defined in spec")
|
||||
return false, 0, 0, fmt.Errorf("CSISnapshot not defined in spec")
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
|
||||
defer cancel()
|
||||
|
||||
csiSnapshotStatus, timestamp, size, err := handler.csiConnection.GetSnapshotStatus(ctx, content.Spec.CSI.SnapshotHandle)
|
||||
if err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("failed to list snapshot data %s: %q", content.Name, err)
|
||||
return false, 0, 0, fmt.Errorf("failed to list snapshot data %s: %q", content.Name, err)
|
||||
}
|
||||
return csiSnapshotStatus, timestamp, size, nil
|
||||
|
||||
|
@@ -21,7 +21,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/golang/glog"
|
||||
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
|
||||
"k8s.io/api/core/v1"
|
||||
@@ -667,8 +666,8 @@ func (ctrl *csiSnapshotController) updateSnapshotContentSize(content *crdv1.Volu
|
||||
}
|
||||
|
||||
// UpdateSnapshotStatus converts snapshot status to crdv1.VolumeSnapshotCondition
|
||||
func (ctrl *csiSnapshotController) updateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, csistatus *csi.SnapshotStatus, createdAt, size int64, bound bool) (*crdv1.VolumeSnapshot, error) {
|
||||
glog.V(5).Infof("updating VolumeSnapshot[]%s, set status %v, timestamp %v", snapshotKey(snapshot), csistatus, createdAt)
|
||||
func (ctrl *csiSnapshotController) updateSnapshotStatus(snapshot *crdv1.VolumeSnapshot, readyToUse bool, createdAt, size int64, bound bool) (*crdv1.VolumeSnapshot, error) {
|
||||
glog.V(5).Infof("updating VolumeSnapshot[]%s, readyToUse %v, timestamp %v", snapshotKey(snapshot), readyToUse, createdAt)
|
||||
status := snapshot.Status
|
||||
change := false
|
||||
timeAt := &metav1.Time{
|
||||
@@ -676,6 +675,20 @@ func (ctrl *csiSnapshotController) updateSnapshotStatus(snapshot *crdv1.VolumeSn
|
||||
}
|
||||
|
||||
snapshotClone := snapshot.DeepCopy()
|
||||
if readyToUse {
|
||||
if bound {
|
||||
status.Ready = true
|
||||
// Remove the error if checking snapshot is already bound and ready
|
||||
status.Error = nil
|
||||
change = true
|
||||
}
|
||||
if status.CreationTime == nil {
|
||||
status.CreationTime = timeAt
|
||||
change = true
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO FIXME
|
||||
switch csistatus.Type {
|
||||
case csi.SnapshotStatus_READY:
|
||||
if bound {
|
||||
@@ -704,6 +717,7 @@ func (ctrl *csiSnapshotController) updateSnapshotStatus(snapshot *crdv1.VolumeSn
|
||||
change = true
|
||||
}
|
||||
}
|
||||
*/
|
||||
if change {
|
||||
if size > 0 {
|
||||
status.RestoreSize = resource.NewQuantity(size, resource.BinarySI)
|
||||
|
Reference in New Issue
Block a user