Bumping k8s dependencies to 1.13

This commit is contained in:
Cheng Xing
2018-11-16 14:08:25 -08:00
parent 305407125c
commit b4c0b68ec7
8002 changed files with 884099 additions and 276228 deletions

View File

@@ -14,22 +14,25 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/labelmanager:go_default_library",
"//pkg/volume/csi/nodeinfomanager:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
@@ -44,23 +47,30 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/fake:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
@@ -76,7 +86,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/volume/csi/fake:all-srcs",
"//pkg/volume/csi/labelmanager:all-srcs",
"//pkg/volume/csi/nodeinfomanager:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],

View File

@@ -56,6 +56,8 @@ type csiAttacher struct {
// volume.Attacher methods
var _ volume.Attacher = &csiAttacher{}
var _ volume.DeviceMounter = &csiAttacher{}
func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
if spec == nil {
glog.Error(log("attacher.Attach missing volume.Spec"))
@@ -68,6 +70,16 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
return "", err
}
skip, err := c.plugin.skipAttach(csiSource.Driver)
if err != nil {
glog.Error(log("attacher.Attach failed to find if driver is attachable: %v", err))
return "", err
}
if skip {
glog.V(4).Infof(log("skipping attach for driver %s", csiSource.Driver))
return "", nil
}
node := string(nodeName)
pvName := spec.PersistentVolume.GetName()
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node)
@@ -118,6 +130,16 @@ func (c *csiAttacher) WaitForAttach(spec *volume.Spec, attachID string, pod *v1.
return "", err
}
skip, err := c.plugin.skipAttach(source.Driver)
if err != nil {
glog.Error(log("attacher.Attach failed to find if driver is attachable: %v", err))
return "", err
}
if skip {
glog.V(4).Infof(log("Driver is not attachable, skip waiting for attach"))
return "", nil
}
return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
}
@@ -135,7 +157,7 @@ func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID str
attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
glog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
return "", err
return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
}
// if being deleted, fail fast
if attach.GetDeletionTimestamp() != nil {
@@ -219,11 +241,22 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No
glog.Error(log("attacher.VolumesAreAttached failed: %v", err))
continue
}
skip, err := c.plugin.skipAttach(source.Driver)
if err != nil {
glog.Error(log("Failed to check CSIDriver for %s: %s", source.Driver, err))
} else {
if skip {
// This volume is not attachable, pretend it's attached
attached[spec] = true
continue
}
}
attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(nodeName))
glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
attached[spec] = false
glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
continue
}
@@ -245,9 +278,14 @@ func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
return deviceMountPath, nil
}
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) (err error) {
glog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath))
if deviceMountPath == "" {
err = fmt.Errorf("attacher.MountDevice failed, deviceMountPath is empty")
return err
}
mounted, err := isDirMounted(c.plugin, deviceMountPath)
if err != nil {
glog.Error(log("attacher.MountDevice failed while checking mount status for dir [%s]", deviceMountPath))
@@ -269,6 +307,35 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
return err
}
// Store volume metadata for UnmountDevice. Keep it around even if the
// driver does not support NodeStage, UnmountDevice still needs it.
if err = os.MkdirAll(deviceMountPath, 0750); err != nil {
glog.Error(log("attacher.MountDevice failed to create dir %#v: %v", deviceMountPath, err))
return err
}
glog.V(4).Info(log("created target path successfully [%s]", deviceMountPath))
dataDir := filepath.Dir(deviceMountPath)
data := map[string]string{
volDataKey.volHandle: csiSource.VolumeHandle,
volDataKey.driverName: csiSource.Driver,
}
if err = saveVolumeData(dataDir, volDataFileName, data); err != nil {
glog.Error(log("failed to save volume info data: %v", err))
if cleanerr := os.RemoveAll(dataDir); err != nil {
glog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanerr))
}
return err
}
defer func() {
if err != nil {
// clean up metadata
glog.Errorf(log("attacher.MountDevice failed: %v", err))
if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
glog.Error(log("attacher.MountDevice failed to remove mount dir after errir [%s]: %v", deviceMountPath, err))
}
}
}()
if c.csiClient == nil {
c.csiClient = newCsiDriverClient(csiSource.Driver)
}
@@ -279,51 +346,28 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := hasStageUnstageCapability(ctx, csi)
if err != nil {
glog.Error(log("attacher.MountDevice failed to check STAGE_UNSTAGE_VOLUME: %v", err))
return err
}
if !stageUnstageSet {
glog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
// defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there.
return nil
}
// Start MountDevice
if deviceMountPath == "" {
return fmt.Errorf("attacher.MountDevice failed, deviceMountPath is empty")
}
nodeName := string(c.plugin.host.GetNodeName())
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
attachment, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
glog.Error(log("attacher.MountDevice failed while getting volume attachment [id=%v]: %v", attachID, err))
return err
}
if attachment == nil {
glog.Error(log("unable to find VolumeAttachment [id=%s]", attachID))
return errors.New("no existing VolumeAttachment found")
}
publishVolumeInfo := attachment.Status.AttachmentMetadata
publishVolumeInfo, err := c.plugin.getPublishVolumeInfo(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
nodeStageSecrets := map[string]string{}
if csiSource.NodeStageSecretRef != nil {
nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef)
if err != nil {
return fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
return err
}
}
// create target_dir before call to NodeStageVolume
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
glog.Error(log("attacher.MountDevice failed to create dir %#v: %v", deviceMountPath, err))
return err
}
glog.V(4).Info(log("created target path successfully [%s]", deviceMountPath))
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := v1.ReadWriteOnce
if spec.PersistentVolume.Spec.AccessModes != nil {
@@ -331,10 +375,6 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}
fsType := csiSource.FSType
if len(fsType) == 0 {
fsType = defaultFSType
}
err = csi.NodeStageVolume(ctx,
csiSource.VolumeHandle,
publishVolumeInfo,
@@ -345,10 +385,6 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
csiSource.VolumeAttributes)
if err != nil {
glog.Errorf(log("attacher.MountDevice failed: %v", err))
if removeMountDirErr := removeMountDir(c.plugin, deviceMountPath); removeMountDirErr != nil {
glog.Error(log("attacher.MountDevice failed to remove mount dir after a NodeStageVolume() error [%s]: %v", deviceMountPath, removeMountDirErr))
}
return err
}
@@ -358,6 +394,8 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
var _ volume.Detacher = &csiAttacher{}
var _ volume.DeviceUnmounter = &csiAttacher{}
func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
if volumeName == "" {
@@ -461,10 +499,21 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
glog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))
// Setup
driverName, volID, err := getDriverAndVolNameFromDeviceMountPath(c.k8s, deviceMountPath)
if err != nil {
glog.Errorf(log("attacher.UnmountDevice failed to get driver and volume name from device mount path: %v", err))
return err
var driverName, volID string
dataDir := filepath.Dir(deviceMountPath)
data, err := loadVolumeData(dataDir, volDataFileName)
if err == nil {
driverName = data[volDataKey.driverName]
volID = data[volDataKey.volHandle]
} else {
glog.Error(log("UnmountDevice failed to load volume data file [%s]: %v", dataDir, err))
// The volume might have been mounted by old CSI volume plugin. Fall back to the old behavior: read PV from API server
driverName, volID, err = getDriverAndVolNameFromDeviceMountPath(c.k8s, deviceMountPath)
if err != nil {
glog.Errorf(log("attacher.UnmountDevice failed to get driver and volume name from device mount path: %v", err))
return err
}
}
if c.csiClient == nil {
@@ -482,6 +531,11 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
}
if !stageUnstageSet {
glog.Infof(log("attacher.UnmountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping UnmountDevice..."))
// Just delete the global directory + json file
if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
return fmt.Errorf("failed to clean up gloubal mount %s: %s", dataDir, err)
}
return nil
}
@@ -495,6 +549,11 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
return err
}
// Delete the global directory + json file
if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
return fmt.Errorf("failed to clean up gloubal mount %s: %s", dataDir, err)
}
glog.V(4).Infof(log("attacher.UnmountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
return nil
}

View File

@@ -18,24 +18,37 @@ package csi
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/golang/glog"
storage "k8s.io/api/storage/v1beta1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
utiltesting "k8s.io/client-go/util/testing"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
var (
bFalse = false
bTrue = true
)
func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttachment {
return &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
@@ -56,6 +69,40 @@ func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttach
}
}
func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.RaceFreeFakeWatcher, attachID string, status storage.VolumeAttachmentStatus) {
ticker := time.NewTicker(10 * time.Millisecond)
var attach *storage.VolumeAttachment
var err error
defer ticker.Stop()
// wait for attachment to be saved
for i := 0; i < 100; i++ {
attach, err = client.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
if apierrs.IsNotFound(err) {
<-ticker.C
continue
}
t.Error(err)
}
if attach != nil {
glog.Infof("stopping wait")
break
}
}
glog.Infof("stopped wait")
if attach == nil {
t.Logf("attachment not found for id:%v", attachID)
} else {
attach.Status = status
_, err := client.StorageV1beta1().VolumeAttachments().Update(attach)
if err != nil {
t.Error(err)
}
watch.Modify(attach)
}
}
func TestAttacherAttach(t *testing.T) {
testCases := []struct {
@@ -119,8 +166,7 @@ func TestAttacherAttach(t *testing.T) {
// attacher loop
for i, tc := range testCases {
t.Logf("test case: %s", tc.name)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@@ -145,42 +191,144 @@ func TestAttacherAttach(t *testing.T) {
}
}(tc.attachID, tc.nodeName, tc.shouldFail)
// update attachment to avoid long waitForAttachment
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
// wait for attachment to be saved
var attach *storage.VolumeAttachment
for i := 0; i < 100; i++ {
attach, err = csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Get(tc.attachID, meta.GetOptions{})
if err != nil {
if apierrs.IsNotFound(err) {
<-ticker.C
continue
}
t.Error(err)
var status storage.VolumeAttachmentStatus
if tc.injectAttacherError {
status.Attached = false
status.AttachError = &storage.VolumeError{
Message: "attacher error",
}
if attach != nil {
break
}
}
if attach == nil {
t.Logf("attachment not found for id:%v", tc.attachID)
} else {
if tc.injectAttacherError {
attach.Status.Attached = false
attach.Status.AttachError = &storage.VolumeError{
Message: "attacher error",
}
} else {
attach.Status.Attached = true
}
_, err = csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Update(attach)
if err != nil {
t.Error(err)
}
fakeWatcher.Modify(attach)
status.Attached = true
}
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status)
}
}
func TestAttacherWithCSIDriver(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)()
tests := []struct {
name string
driver string
expectVolumeAttachment bool
}{
{
name: "CSIDriver not attachable",
driver: "not-attachable",
expectVolumeAttachment: false,
},
{
name: "CSIDriver is attachable",
driver: "attachable",
expectVolumeAttachment: true,
},
{
name: "CSIDriver.AttachRequired not set -> failure",
driver: "nil",
expectVolumeAttachment: true,
},
{
name: "CSIDriver does not exist not set -> failure",
driver: "unknown",
expectVolumeAttachment: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeCSIClient := fakecsi.NewSimpleClientset(
getCSIDriver("not-attachable", nil, &bFalse),
getCSIDriver("attachable", nil, &bTrue),
getCSIDriver("nil", nil, nil),
)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, fakeCSIClient)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false)
expectedAttachID := getAttachmentName("test-vol", test.driver, "node")
status := storage.VolumeAttachmentStatus{
Attached: true,
}
if test.expectVolumeAttachment {
go markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status)
}
attachID, err := csiAttacher.Attach(spec, types.NodeName("node"))
if err != nil {
t.Errorf("Attach() failed: %s", err)
}
if test.expectVolumeAttachment && attachID == "" {
t.Errorf("Epected attachID, got nothing")
}
if !test.expectVolumeAttachment && attachID != "" {
t.Errorf("Epected empty attachID, got %q", attachID)
}
})
}
}
func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)()
// In order to detect if the volume plugin would skip WaitForAttach for non-attachable drivers,
// we do not instantiate any VolumeAttachment. So if the plugin does not skip attach, WaitForVolumeAttachment
// will return an error that volume attachment was not found.
tests := []struct {
name string
driver string
expectError bool
}{
{
name: "CSIDriver not attachable -> success",
driver: "not-attachable",
expectError: false,
},
{
name: "CSIDriver is attachable -> failure",
driver: "attachable",
expectError: true,
},
{
name: "CSIDriver.AttachRequired not set -> failure",
driver: "nil",
expectError: true,
},
{
name: "CSIDriver does not exist not set -> failure",
driver: "unknown",
expectError: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeCSIClient := fakecsi.NewSimpleClientset(
getCSIDriver("not-attachable", nil, &bFalse),
getCSIDriver("attachable", nil, &bTrue),
getCSIDriver("nil", nil, nil),
)
plug, tmpDir := newTestPlugin(t, nil, fakeCSIClient)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false)
_, err = csiAttacher.WaitForAttach(spec, "", nil, time.Second)
if err != nil && !test.expectError {
t.Errorf("Unexpected error: %s", err)
}
if err == nil && test.expectError {
t.Errorf("Expected error, got none")
}
})
}
}
@@ -236,7 +384,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
}
for i, tc := range testCases {
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@@ -286,7 +434,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
}
func TestAttacherVolumesAreAttached(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@@ -373,7 +521,7 @@ func TestAttacherDetach(t *testing.T) {
for _, tc := range testCases {
t.Logf("running test: %v", tc.name)
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t)
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
if tc.reactor != nil {
client.PrependReactor("*", "*", tc.reactor)
@@ -422,7 +570,7 @@ func TestAttacherDetach(t *testing.T) {
func TestAttacherGetDeviceMountPath(t *testing.T) {
// Setup
// Create a new attacher
plug, _, tmpDir, _ := newTestWatchPlugin(t)
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err0 := plug.NewAttacher()
if err0 != nil {
@@ -522,10 +670,6 @@ func TestAttacherMountDevice(t *testing.T) {
deviceMountPath: "path2",
stageUnstageSet: false,
},
{
testName: "stage_unstage not set no vars should not fail",
stageUnstageSet: false,
},
}
for _, tc := range testCases {
@@ -535,7 +679,7 @@ func TestAttacherMountDevice(t *testing.T) {
// Setup
// Create a new attacher
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err0 := plug.NewAttacher()
if err0 != nil {
@@ -544,6 +688,10 @@ func TestAttacherMountDevice(t *testing.T) {
csiAttacher := attacher.(*csiAttacher)
csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet)
if tc.deviceMountPath != "" {
tc.deviceMountPath = filepath.Join(tmpDir, tc.deviceMountPath)
}
nodeName := string(csiAttacher.plugin.host.GetNodeName())
// Create spec
@@ -588,12 +736,12 @@ func TestAttacherMountDevice(t *testing.T) {
t.Errorf("got wrong number of staged volumes, expecting %v got: %v", numStaged, len(staged))
}
if tc.stageUnstageSet {
gotPath, ok := staged[tc.volName]
vol, ok := staged[tc.volName]
if !ok {
t.Errorf("could not find staged volume: %s", tc.volName)
}
if gotPath != tc.deviceMountPath {
t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, gotPath)
if vol.Path != tc.deviceMountPath {
t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, vol.Path)
}
}
}
@@ -604,51 +752,56 @@ func TestAttacherUnmountDevice(t *testing.T) {
testName string
volID string
deviceMountPath string
jsonFile string
createPV bool
stageUnstageSet bool
shouldFail bool
}{
{
testName: "normal",
testName: "normal, json file exists",
volID: "project/zone/test-vol1",
deviceMountPath: "/tmp/csi-test049507108/plugins/csi/pv/test-pv-name/globalmount",
deviceMountPath: "plugins/csi/pv/test-pv-name/globalmount",
jsonFile: `{"driverName": "csi", "volumeHandle":"project/zone/test-vol1"}`,
createPV: false,
stageUnstageSet: true,
},
{
testName: "no volID",
testName: "normal, json file doesn't exist -> use PV",
volID: "project/zone/test-vol1",
deviceMountPath: "plugins/csi/pv/test-pv-name/globalmount",
jsonFile: "",
createPV: true,
stageUnstageSet: true,
},
{
testName: "invalid json -> use PV",
volID: "project/zone/test-vol1",
deviceMountPath: "plugins/csi/pv/test-pv-name/globalmount",
jsonFile: `{"driverName"}}`,
createPV: true,
stageUnstageSet: true,
},
{
testName: "no json, no PV.volID",
volID: "",
deviceMountPath: "/tmp/csi-test049507108/plugins/csi/pv/test-pv-name/globalmount",
stageUnstageSet: true,
deviceMountPath: "plugins/csi/pv/test-pv-name/globalmount",
jsonFile: "",
createPV: true,
shouldFail: true,
},
{
testName: "no device mount path",
testName: "no json, no PV",
volID: "project/zone/test-vol1",
deviceMountPath: "",
deviceMountPath: "plugins/csi/pv/test-pv-name/globalmount",
jsonFile: "",
createPV: false,
stageUnstageSet: true,
shouldFail: true,
},
{
testName: "missing part of device mount path",
volID: "project/zone/test-vol1",
deviceMountPath: "/tmp/csi-test049507108/plugins/csi/pv/test-pv-name/globalmount",
stageUnstageSet: true,
shouldFail: true,
},
{
testName: "test volume name mismatch",
volID: "project/zone/test-vol1",
deviceMountPath: "/tmp/csi-test049507108/plugins/csi/pv/test-pv-name/globalmount",
stageUnstageSet: true,
shouldFail: true,
},
{
testName: "stage_unstage not set",
volID: "project/zone/test-vol1",
deviceMountPath: "/tmp/csi-test049507108/plugins/csi/pv/test-pv-name/globalmount",
stageUnstageSet: false,
},
{
testName: "stage_unstage not set no vars should not fail",
deviceMountPath: "plugins/csi/pv/test-pv-name/globalmount",
jsonFile: `{"driverName":"test-driver","volumeHandle":"test-vol1"}`,
stageUnstageSet: false,
},
}
@@ -657,7 +810,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
t.Logf("Running test case: %s", tc.testName)
// Setup
// Create a new attacher
plug, _, tmpDir, _ := newTestWatchPlugin(t)
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err0 := plug.NewAttacher()
if err0 != nil {
@@ -666,29 +819,45 @@ func TestAttacherUnmountDevice(t *testing.T) {
csiAttacher := attacher.(*csiAttacher)
csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet)
if tc.deviceMountPath != "" {
tc.deviceMountPath = filepath.Join(tmpDir, tc.deviceMountPath)
}
// Add the volume to NodeStagedVolumes
cdc := csiAttacher.csiClient.(*fakeCsiDriverClient)
cdc.nodeClient.AddNodeStagedVolume(tc.volID, tc.deviceMountPath)
cdc.nodeClient.AddNodeStagedVolume(tc.volID, tc.deviceMountPath, nil)
// Make the PV for this object
// Make JSON for this object
if tc.deviceMountPath != "" {
if err := os.MkdirAll(tc.deviceMountPath, 0755); err != nil {
t.Fatalf("error creating directory %s: %s", tc.deviceMountPath, err)
}
}
dir := filepath.Dir(tc.deviceMountPath)
// dir is now /var/lib/kubelet/plugins/kubernetes.io/csi/pv/{pvname}
pvName := filepath.Base(dir)
pv := makeTestPV(pvName, 5, "csi", tc.volID)
_, err := csiAttacher.k8s.CoreV1().PersistentVolumes().Create(pv)
if err != nil && !tc.shouldFail {
t.Fatalf("Failed to create PV: %v", err)
if tc.jsonFile != "" {
dataPath := filepath.Join(dir, volDataFileName)
if err := ioutil.WriteFile(dataPath, []byte(tc.jsonFile), 0644); err != nil {
t.Fatalf("error creating %s: %s", dataPath, err)
}
}
if tc.createPV {
// Make the PV for this object
pvName := filepath.Base(dir)
pv := makeTestPV(pvName, 5, "csi", tc.volID)
_, err := csiAttacher.k8s.CoreV1().PersistentVolumes().Create(pv)
if err != nil && !tc.shouldFail {
t.Fatalf("Failed to create PV: %v", err)
}
}
// Run
err = csiAttacher.UnmountDevice(tc.deviceMountPath)
err := csiAttacher.UnmountDevice(tc.deviceMountPath)
// Verify
if err != nil {
if !tc.shouldFail {
t.Errorf("test should not fail, but error occurred: %v", err)
}
return
continue
}
if err == nil && tc.shouldFail {
t.Errorf("test should fail, but no error occurred")
@@ -711,11 +880,23 @@ func TestAttacherUnmountDevice(t *testing.T) {
t.Errorf("could not find expected staged volume: %s", tc.volID)
}
if tc.jsonFile != "" && !tc.shouldFail {
dataPath := filepath.Join(dir, volDataFileName)
if _, err := os.Stat(dataPath); !os.IsNotExist(err) {
if err != nil {
t.Errorf("error checking file %s: %s", dataPath, err)
} else {
t.Errorf("json file %s should not exists, but it does", dataPath)
}
} else {
t.Logf("json file %s was correctly removed", dataPath)
}
}
}
}
// create a plugin mgr to load plugins and setup a fake client
func newTestWatchPlugin(t *testing.T) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
@@ -725,10 +906,15 @@ func newTestWatchPlugin(t *testing.T) (*csiPlugin, *watch.RaceFreeFakeWatcher, s
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil))
fakeClient.Fake.WatchReactionChain = fakeClient.Fake.WatchReactionChain[:1]
host := volumetest.NewFakeVolumeHost(
if csiClient == nil {
csiClient = fakecsi.NewSimpleClientset()
}
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
csiClient,
nil,
"node",
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
@@ -743,5 +929,12 @@ func newTestWatchPlugin(t *testing.T) (*csiPlugin, *watch.RaceFreeFakeWatcher, s
t.Fatalf("cannot assert plugin to be type csiPlugin")
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
})
}
return csiPlug, fakeWatcher, tmpDir, fakeClient
}

View File

@@ -47,19 +47,20 @@ type csiBlockMapper struct {
var _ volume.BlockVolumeMapper = &csiBlockMapper{}
// GetGlobalMapPath returns a path (on the node) where the devicePath will be symlinked to
// Example: plugins/kubernetes.io/csi/volumeDevices/{volumeID}
// GetGlobalMapPath returns a path (on the node) to a device file which will be symlinked to
// Example: plugins/kubernetes.io/csi/volumeDevices/{volumeID}/dev
func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
glog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
return dir, nil
}
// GetPodDeviceMapPath returns pod's device map path and volume name
// path: pods/{podUid}/volumeDevices/kubernetes.io~csi/, {volumeID}
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi/{volumeID}/dev, {volumeID}
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
path, specName := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, csiPluginName), m.specName
glog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath = %s", path))
path := filepath.Join(m.plugin.host.GetPodVolumeDeviceDir(m.podUID, csiPluginName), m.specName, "dev")
specName := m.specName
glog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
return path, specName
}
@@ -87,6 +88,9 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
return "", err
}
globalMapPathBlockFile := filepath.Join(globalMapPath, "file")
glog.V(4).Infof(log("blockMapper.SetupDevice global device map path file set [%s]", globalMapPathBlockFile))
csi := m.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
@@ -128,13 +132,25 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
}
}
// create globalMapPath before call to NodeStageVolume
// setup path globalMapPath and block file before call to NodeStageVolume
if err := os.MkdirAll(globalMapPath, 0750); err != nil {
glog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPath, err))
return "", err
}
glog.V(4).Info(log("blockMapper.SetupDevice created global device map path successfully [%s]", globalMapPath))
// create block device file
blockFile, err := os.OpenFile(globalMapPathBlockFile, os.O_CREATE|os.O_RDWR, 0750)
if err != nil {
glog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPathBlockFile, err))
return "", err
}
if err := blockFile.Close(); err != nil {
glog.Error(log("blockMapper.SetupDevice failed to close file %s: %v", globalMapPathBlockFile, err))
return "", err
}
glog.V(4).Info(log("blockMapper.SetupDevice created global map path block device file successfully [%s]", globalMapPathBlockFile))
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := v1.ReadWriteOnce
if m.spec.PersistentVolume.Spec.AccessModes != nil {
@@ -144,7 +160,7 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
err = csi.NodeStageVolume(ctx,
csiSource.VolumeHandle,
publishVolumeInfo,
globalMapPath,
globalMapPathBlockFile,
fsTypeBlockName,
accessMode,
nodeStageSecrets,
@@ -158,8 +174,8 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
return "", err
}
glog.V(4).Infof(log("blockMapper.SetupDevice successfully requested NodeStageVolume [%s]", globalMapPath))
return globalMapPath, nil
glog.V(4).Infof(log("blockMapper.SetupDevice successfully requested NodeStageVolume [%s]", globalMapPathBlockFile))
return globalMapPathBlockFile, nil
}
func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error {
@@ -176,16 +192,29 @@ func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, vol
csiSource, err := getCSISourceFromSpec(m.spec)
if err != nil {
glog.Error(log("blockMapper.Map failed to get CSI persistent source: %v", err))
glog.Error(log("blockMapper.MapDevice failed to get CSI persistent source: %v", err))
return err
}
dir := filepath.Join(volumeMapPath, volumeMapName)
csi := m.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
globalMapPathBlockFile := devicePath
dir, _ := m.GetPodDeviceMapPath()
targetBlockFilePath := filepath.Join(dir, "file")
glog.V(4).Infof(log("blockMapper.MapDevice target volume map file path %s", targetBlockFilePath))
stageCapable, err := hasStageUnstageCapability(ctx, csi)
if err != nil {
glog.Error(log("blockMapper.MapDevice failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err))
return err
}
if !stageCapable {
globalMapPathBlockFile = ""
}
nodeName := string(m.plugin.host.GetNodeName())
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
@@ -213,10 +242,22 @@ func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, vol
}
if err := os.MkdirAll(dir, 0750); err != nil {
glog.Error(log("blockMapper.MapDevice failed to create dir %#v: %v", dir, err))
glog.Error(log("blockMapper.MapDevice failed to create dir %s: %v", dir, err))
return err
}
glog.V(4).Info(log("blockMapper.MapDevice created NodePublish path [%s]", dir))
glog.V(4).Info(log("blockMapper.MapDevice created target volume map path successfully [%s]", dir))
// create target map volume block file
targetBlockFile, err := os.OpenFile(targetBlockFilePath, os.O_CREATE|os.O_RDWR, 0750)
if err != nil {
glog.Error(log("blockMapper.MapDevice failed to create file %s: %v", targetBlockFilePath, err))
return err
}
if err := targetBlockFile.Close(); err != nil {
glog.Error(log("blockMapper.MapDevice failed to close file %s: %v", targetBlockFilePath, err))
return err
}
glog.V(4).Info(log("blockMapper.MapDevice created target volume map file successfully [%s]", targetBlockFilePath))
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := v1.ReadWriteOnce
@@ -228,8 +269,8 @@ func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, vol
ctx,
m.volumeID,
m.readOnly,
globalMapPath,
dir,
globalMapPathBlockFile,
targetBlockFilePath,
accessMode,
publishVolumeInfo,
csiSource.VolumeAttributes,
@@ -240,7 +281,7 @@ func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, vol
if err != nil {
glog.Errorf(log("blockMapper.MapDevice failed: %v", err))
if err := os.RemoveAll(dir); err != nil {
glog.Error(log("blockMapper.MapDevice failed to remove mount dir after a NodePublish() error [%s]: %v", dir, err))
glog.Error(log("blockMapper.MapDevice failed to remove mapped dir after a NodePublish() error [%s]: %v", dir, err))
}
return err
}

View File

@@ -31,7 +31,7 @@ import (
)
func TestBlockMapperGetGlobalMapPath(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
// TODO (vladimirvivien) specName with slashes will not work
@@ -77,13 +77,14 @@ func TestBlockMapperGetGlobalMapPath(t *testing.T) {
}
func TestBlockMapperSetupDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithNodeName(
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
nil,
"fakeNode",
)
plug.host = host
@@ -123,24 +124,29 @@ func TestBlockMapperSetupDevice(t *testing.T) {
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
}
if devicePath != globalMapPath {
if devicePath != filepath.Join(globalMapPath, "file") {
t.Fatalf("mapper.SetupDevice returned unexpected path %s instead of %v", devicePath, globalMapPath)
}
vols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
if vols[csiMapper.volumeID] != devicePath {
vol, ok := vols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.Path != devicePath {
t.Errorf("csi server expected device path %s, got %s", devicePath, vol.Path)
}
}
func TestBlockMapperMapDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithNodeName(
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
nil,
"fakeNode",
)
plug.host = host
@@ -186,28 +192,34 @@ func TestBlockMapperMapDevice(t *testing.T) {
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
}
if _, err := os.Stat(filepath.Join(volumeMapPath, volName)); err != nil {
podVolumeBlockFilePath := filepath.Join(volumeMapPath, "file")
if _, err := os.Stat(podVolumeBlockFilePath); err != nil {
if os.IsNotExist(err) {
t.Errorf("mapper.MapDevice failed, volume path not created: %s", volumeMapPath)
t.Errorf("mapper.MapDevice failed, volume path not created: %v", err)
} else {
t.Errorf("mapper.MapDevice failed: %v", err)
}
}
pubs := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
if pubs[csiMapper.volumeID] != volumeMapPath {
vol, ok := pubs[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.Path != podVolumeBlockFilePath {
t.Errorf("csi server expected path %s, got %s", podVolumeBlockFilePath, vol.Path)
}
}
func TestBlockMapperTearDownDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithNodeName(
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
nil,
"fakeNode",
)
plug.host = host

View File

@@ -32,6 +32,11 @@ import (
)
type csiClient interface {
NodeGetInfo(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
accessibleTopology *csipb.Topology,
err error)
NodePublishVolume(
ctx context.Context,
volumeid string,
@@ -75,6 +80,24 @@ func newCsiDriverClient(driverName string) *csiDriverClient {
return c
}
func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
accessibleTopology *csipb.Topology,
err error) {
glog.V(4).Info(log("calling NodeGetInfo rpc"))
conn, err := newGrpcConn(c.driverName)
if err != nil {
return "", 0, nil, err
}
defer conn.Close()
nodeClient := csipb.NewNodeClient(conn)
res, err := nodeClient.NodeGetInfo(ctx, &csipb.NodeGetInfoRequest{})
return res.GetNodeId(), res.GetMaxVolumesPerNode(), res.GetAccessibleTopology(), nil
}
func (c *csiDriverClient) NodePublishVolume(
ctx context.Context,
volID string,

View File

@@ -24,6 +24,7 @@ import (
csipb "github.com/container-storage-interface/spec/lib/go/csi/v0"
api "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/volume/csi/fake"
"reflect"
)
type fakeCsiDriverClient struct {
@@ -38,6 +39,15 @@ func newFakeCsiDriverClient(t *testing.T, stagingCapable bool) *fakeCsiDriverCli
}
}
func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
accessibleTopology *csipb.Topology,
err error) {
resp, err := c.nodeClient.NodeGetInfo(ctx, &csipb.NodeGetInfoRequest{})
return resp.GetNodeId(), resp.GetMaxVolumesPerNode(), resp.GetAccessibleTopology(), err
}
func (c *fakeCsiDriverClient) NodePublishVolume(
ctx context.Context,
volID string,
@@ -141,6 +151,60 @@ func setupClient(t *testing.T, stageUnstageSet bool) csiClient {
return newFakeCsiDriverClient(t, stageUnstageSet)
}
func TestClientNodeGetInfo(t *testing.T) {
testCases := []struct {
name string
expectedNodeID string
expectedMaxVolumePerNode int64
expectedAccessibleTopology *csipb.Topology
mustFail bool
err error
}{
{
name: "test ok",
expectedNodeID: "node1",
expectedMaxVolumePerNode: 16,
expectedAccessibleTopology: &csipb.Topology{
Segments: map[string]string{"com.example.csi-topology/zone": "zone1"},
},
},
{name: "grpc error", mustFail: true, err: errors.New("grpc error")},
}
client := setupClient(t, false /* stageUnstageSet */)
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
client.(*fakeCsiDriverClient).nodeClient.SetNextError(tc.err)
client.(*fakeCsiDriverClient).nodeClient.SetNodeGetInfoResp(&csipb.NodeGetInfoResponse{
NodeId: tc.expectedNodeID,
MaxVolumesPerNode: tc.expectedMaxVolumePerNode,
AccessibleTopology: tc.expectedAccessibleTopology,
})
nodeID, maxVolumePerNode, accessibleTopology, err := client.NodeGetInfo(context.Background())
if tc.mustFail && err == nil {
t.Error("expected an error but got none")
}
if !tc.mustFail && err != nil {
t.Errorf("expected no errors but got: %v", err)
}
if nodeID != tc.expectedNodeID {
t.Errorf("expected nodeID: %v; got: %v", tc.expectedNodeID, nodeID)
}
if maxVolumePerNode != tc.expectedMaxVolumePerNode {
t.Errorf("expected maxVolumePerNode: %v; got: %v", tc.expectedMaxVolumePerNode, maxVolumePerNode)
}
if !reflect.DeepEqual(accessibleTopology, tc.expectedAccessibleTopology) {
t.Errorf("expected accessibleTopology: %v; got: %v", *tc.expectedAccessibleTopology, *accessibleTopology)
}
}
}
func TestClientNodePublishVolume(t *testing.T) {
testCases := []struct {
name string

View File

@@ -26,16 +26,16 @@ import (
"github.com/golang/glog"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/features"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
const defaultFSType = "ext4"
//TODO (vladimirvivien) move this in a central loc later
var (
volDataKey = struct {
@@ -51,6 +51,7 @@ var (
"nodeName",
"attachmentID",
}
currentPodInfoMountVersion = "v1"
)
type csiMountMgr struct {
@@ -115,9 +116,6 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
}
csi := c.csiClient
nodeName := string(c.plugin.host.GetNodeName())
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
@@ -136,20 +134,13 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
return err
}
}
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
if c.volumeInfo == nil {
attachment, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
nodeName := string(c.plugin.host.GetNodeName())
c.volumeInfo, err = c.plugin.getPublishVolumeInfo(c.k8s, c.volumeID, c.driverName, nodeName)
if err != nil {
glog.Error(log("mounter.SetupAt failed while getting volume attachment [id=%v]: %v", attachID, err))
return err
}
if attachment == nil {
glog.Error(log("unable to find VolumeAttachment [id=%s]", attachID))
return errors.New("no existing VolumeAttachment found")
}
c.volumeInfo = attachment.Status.AttachmentMetadata
}
attribs := csiSource.VolumeAttributes
@@ -176,10 +167,23 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
}
fsType := csiSource.FSType
if len(fsType) == 0 {
fsType = defaultFSType
// Inject pod information into volume_attributes
podAttrs, err := c.podAttributes()
if err != nil {
glog.Error(log("mouter.SetUpAt failed to assemble volume attributes: %v", err))
return err
}
if podAttrs != nil {
if attribs == nil {
attribs = podAttrs
} else {
for k, v := range podAttrs {
attribs[k] = v
}
}
}
fsType := csiSource.FSType
err = csi.NodePublishVolume(
ctx,
c.volumeID,
@@ -202,37 +206,63 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
}
// apply volume ownership
if !c.readOnly && fsGroup != nil {
err := volume.SetVolumeOwnership(c, fsGroup)
if err != nil {
// attempt to rollback mount.
glog.Error(log("mounter.SetupAt failed to set fsgroup volume ownership for [%s]: %v", c.volumeID, err))
glog.V(4).Info(log("mounter.SetupAt attempting to unpublish volume %s due to previous error", c.volumeID))
if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
glog.Error(log(
"mounter.SetupAt failed to unpublish volume [%s]: %v (caused by previous NodePublish error: %v)",
c.volumeID, unpubErr, err,
))
return fmt.Errorf("%v (caused by %v)", unpubErr, err)
}
// The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323
// if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
// if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup
if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
glog.Error(log(
"mounter.SetupAt failed to clean mount dir [%s]: %v (caused by previous NodePublish error: %v)",
dir, unmountErr, err,
))
return fmt.Errorf("%v (caused by %v)", unmountErr, err)
}
return err
err = c.applyFSGroup(fsType, fsGroup)
if err != nil {
// attempt to rollback mount.
fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err)
if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
glog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr))
return fsGrpErr
}
glog.V(4).Info(log("mounter.SetupAt sets fsGroup to [%d] for %s", *fsGroup, c.volumeID))
if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
glog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr))
return fsGrpErr
}
return fsGrpErr
}
glog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
return nil
}
func (c *csiMountMgr) podAttributes() (map[string]string, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
return nil, nil
}
if c.plugin.csiDriverLister == nil {
return nil, errors.New("CSIDriver lister does not exist")
}
csiDriver, err := c.plugin.csiDriverLister.Get(c.driverName)
if err != nil {
if apierrs.IsNotFound(err) {
glog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName))
return nil, nil
}
return nil, err
}
// if PodInfoOnMountVersion is not set or not v1 we do not set pod attributes
if csiDriver.Spec.PodInfoOnMountVersion == nil || *csiDriver.Spec.PodInfoOnMountVersion != currentPodInfoMountVersion {
glog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName))
return nil, nil
}
attrs := map[string]string{
"csi.storage.k8s.io/pod.name": c.pod.Name,
"csi.storage.k8s.io/pod.namespace": c.pod.Namespace,
"csi.storage.k8s.io/pod.uid": string(c.pod.UID),
"csi.storage.k8s.io/serviceAccount.name": c.pod.Spec.ServiceAccountName,
}
glog.V(4).Infof(log("CSIDriver %q requires pod information", c.driverName))
return attrs, nil
}
func (c *csiMountMgr) GetAttributes() volume.Attributes {
mounter := c.plugin.host.GetMounter(c.plugin.GetPluginName())
path := c.GetPath()
@@ -293,6 +323,43 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
return nil
}
// applyFSGroup applies the volume ownership it derives its logic
// from https://github.com/kubernetes/kubernetes/issues/66323
// 1) if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
// 2) if fstype is provided and pv.AccessMode == ReadWriteOnly and !c.spec.ReadOnly then apply fsgroup
func (c *csiMountMgr) applyFSGroup(fsType string, fsGroup *int64) error {
if fsGroup != nil {
if fsType == "" {
glog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, fsType not provided"))
return nil
}
accessModes := c.spec.PersistentVolume.Spec.AccessModes
if c.spec.PersistentVolume.Spec.AccessModes == nil {
glog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, access modes not provided"))
return nil
}
if !hasReadWriteOnce(accessModes) {
glog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, only support ReadWriteOnce access mode"))
return nil
}
if c.readOnly {
glog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, volume is readOnly"))
return nil
}
err := volume.SetVolumeOwnership(c, fsGroup)
if err != nil {
return err
}
glog.V(4).Info(log("mounter.SetupAt fsGroup [%d] applied successfully to %s", *fsGroup, c.volumeID))
}
return nil
}
// isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check
func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
mounter := plug.host.GetMounter(plug.GetPluginName())

View File

@@ -25,25 +25,35 @@ import (
"path"
"testing"
"reflect"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
fakeclient "k8s.io/client-go/kubernetes/fake"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
)
var (
testDriver = "test-driver"
testVol = "vol-123"
testns = "test-ns"
testPodUID = types.UID("test-pod")
testDriver = "test-driver"
testVol = "vol-123"
testns = "test-ns"
testPod = "test-pod"
testPodUID = types.UID("test-pod")
testAccount = "test-service-account"
)
func TestMounterGetPath(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
// TODO (vladimirvivien) specName with slashes will not work
@@ -85,83 +95,299 @@ func TestMounterGetPath(t *testing.T) {
}
}
func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, podInfoEnabled)()
tests := []struct {
name string
driver string
attributes map[string]string
expectedAttributes map[string]string
}{
{
name: "no pod info",
driver: "no-info",
attributes: nil,
expectedAttributes: nil,
},
{
name: "no CSIDriver -> no pod info",
driver: "unknown-driver",
attributes: nil,
expectedAttributes: nil,
},
{
name: "CSIDriver with PodInfoRequiredOnMount=nil -> no pod info",
driver: "nil",
attributes: nil,
expectedAttributes: nil,
},
{
name: "no pod info -> keep existing attributes",
driver: "no-info",
attributes: map[string]string{"foo": "bar"},
expectedAttributes: map[string]string{"foo": "bar"},
},
{
name: "add pod info",
driver: "info",
attributes: nil,
expectedAttributes: map[string]string{"csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "test-service-account", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns"},
},
{
name: "add pod info -> keep existing attributes",
driver: "info",
attributes: map[string]string{"foo": "bar"},
expectedAttributes: map[string]string{"foo": "bar", "csi.storage.k8s.io/pod.uid": "test-pod", "csi.storage.k8s.io/serviceAccount.name": "test-service-account", "csi.storage.k8s.io/pod.name": "test-pod", "csi.storage.k8s.io/pod.namespace": "test-ns"},
},
}
emptyPodMountInfoVersion := ""
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
glog.Infof("Starting test %s", test.name)
fakeClient := fakeclient.NewSimpleClientset()
fakeCSIClient := fakecsi.NewSimpleClientset(
getCSIDriver("no-info", &emptyPodMountInfoVersion, nil),
getCSIDriver("info", &currentPodInfoMountVersion, nil),
getCSIDriver("nil", nil, nil),
)
plug, tmpDir := newTestPlugin(t, fakeClient, fakeCSIClient)
defer os.RemoveAll(tmpDir)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return plug.csiDriverInformer.Informer().HasSynced(), nil
})
}
pv := makeTestPV("test-pv", 10, test.driver, testVol)
pv.Spec.CSI.VolumeAttributes = test.attributes
pvName := pv.GetName()
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{
ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns, Name: testPod},
Spec: api.PodSpec{
ServiceAccountName: testAccount,
},
},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
},
}
_, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
// Mounter.SetUp()
fsGroup := int64(2000)
if err := csiMounter.SetUp(&fsGroup); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
//Test the default value of file system type is not overridden
if len(csiMounter.spec.PersistentVolume.Spec.CSI.FSType) != 0 {
t.Errorf("default value of file system type was overridden by type %s", csiMounter.spec.PersistentVolume.Spec.CSI.FSType)
}
path := csiMounter.GetPath()
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
vol, ok := pubs[csiMounter.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.Path != csiMounter.GetPath() {
t.Errorf("csi server expected path %s, got %s", csiMounter.GetPath(), vol.Path)
}
if podInfoEnabled {
if !reflect.DeepEqual(vol.Attributes, test.expectedAttributes) {
t.Errorf("csi server expected attributes %+v, got %+v", test.expectedAttributes, vol.Attributes)
}
} else {
// CSIPodInfo feature is disabled, we expect no modifications to attributes.
if !reflect.DeepEqual(vol.Attributes, test.attributes) {
t.Errorf("csi server expected attributes %+v, got %+v", test.attributes, vol.Attributes)
}
}
})
}
}
func TestMounterSetUp(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
t.Run("WithCSIPodInfo", func(t *testing.T) {
MounterSetUpTests(t, true)
})
t.Run("WithoutCSIPodInfo", func(t *testing.T) {
MounterSetUpTests(t, false)
})
}
func TestMounterSetUpWithFSGroup(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithNodeName(
tmpDir,
fakeClient,
nil,
"fakeNode",
)
plug.host = host
pv := makeTestPV("test-pv", 10, testDriver, testVol)
pvName := pv.GetName()
plug, tmpDir := newTestPlugin(t, fakeClient, nil)
defer os.RemoveAll(tmpDir)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
testCases := []struct {
name string
accessModes []api.PersistentVolumeAccessMode
readOnly bool
fsType string
setFsGroup bool
fsGroup int64
}{
{
name: "default fstype, with no fsgroup (should not apply fsgroup)",
accessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
readOnly: false,
fsType: "",
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
{
name: "default fstype with fsgroup (should not apply fsgroup)",
accessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
readOnly: false,
fsType: "",
setFsGroup: true,
fsGroup: 3000,
},
{
name: "fstype, fsgroup, RWM, ROM provided (should not apply fsgroup)",
accessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteMany,
api.ReadOnlyMany,
},
fsType: "ext4",
setFsGroup: true,
fsGroup: 3000,
},
{
name: "fstype, fsgroup, RWO, but readOnly (should not apply fsgroup)",
accessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
readOnly: true,
fsType: "ext4",
setFsGroup: true,
fsGroup: 3000,
},
{
name: "fstype, fsgroup, RWO provided (should apply fsgroup)",
accessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
fsType: "ext4",
setFsGroup: true,
fsGroup: 3000,
},
}
_, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
// Mounter.SetUp()
fsGroup := int64(2000)
if err := csiMounter.SetUp(&fsGroup); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
path := csiMounter.GetPath()
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
for i, tc := range testCases {
t.Logf("Running test %s", tc.name)
volName := fmt.Sprintf("test-vol-%d", i)
pv := makeTestPV("test-pv", 10, testDriver, volName)
pv.Spec.AccessModes = tc.accessModes
pvName := pv.GetName()
spec := volume.NewSpecFromPersistentVolume(pv, tc.readOnly)
if tc.fsType != "" {
spec.PersistentVolume.Spec.CSI.FSType = tc.fsType
}
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
if pubs[csiMounter.volumeID] != csiMounter.GetPath() {
t.Error("csi server may not have received NodePublishVolume call")
mounter, err := plug.NewMounter(
spec,
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
attachment := makeTestAttachment(attachID, "test-node", pvName)
_, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
if err != nil {
t.Errorf("failed to setup VolumeAttachment: %v", err)
continue
}
// Mounter.SetUp()
var fsGroupPtr *int64
if tc.setFsGroup {
fsGroup := tc.fsGroup
fsGroupPtr = &fsGroup
}
if err := csiMounter.SetUp(fsGroupPtr); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
//Test the default value of file system type is not overridden
if len(csiMounter.spec.PersistentVolume.Spec.CSI.FSType) != len(tc.fsType) {
t.Errorf("file system type was overridden by type %s", csiMounter.spec.PersistentVolume.Spec.CSI.FSType)
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
if pubs[csiMounter.volumeID].Path != csiMounter.GetPath() {
t.Error("csi server may not have received NodePublishVolume call")
}
}
}
func TestUnmounterTeardown(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
@@ -210,7 +436,7 @@ func TestUnmounterTeardown(t *testing.T) {
}
func TestSaveVolumeData(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
@@ -256,3 +482,15 @@ func TestSaveVolumeData(t *testing.T) {
}
}
}
func getCSIDriver(name string, podInfoMountVersion *string, attachable *bool) *csiapi.CSIDriver {
return &csiapi.CSIDriver{
ObjectMeta: meta.ObjectMeta{
Name: name,
},
Spec: csiapi.CSIDriverSpec{
PodInfoOnMountVersion: podInfoMountVersion,
AttachRequired: attachable,
},
}
}

View File

@@ -25,15 +25,23 @@ import (
"sync"
"time"
"context"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1"
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
"k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
)
const (
@@ -48,11 +56,16 @@ const (
volNameSep = "^"
volDataFileName = "vol_data.json"
fsTypeBlockName = "block"
// TODO: increase to something useful
csiResyncPeriod = time.Minute
)
type csiPlugin struct {
host volume.VolumeHost
blockEnabled bool
host volume.VolumeHost
blockEnabled bool
csiDriverLister csilister.CSIDriverLister
csiDriverInformer csiinformer.CSIDriverInformer
}
// ProbeVolumePlugins returns implemented plugins
@@ -77,43 +90,92 @@ type csiDriversStore struct {
sync.RWMutex
}
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
type RegistrationHandler struct {
}
// TODO (verult) consider using a struct instead of global variables
// csiDrivers map keep track of all registered CSI drivers on the node and their
// corresponding sockets
var csiDrivers csiDriversStore
var lm labelmanager.Interface
var nim nodeinfomanager.Interface
// RegistrationCallback is called by kubelet's plugin watcher upon detection
// PluginHandler is the plugin registration handler interface passed to the
// pluginwatcher module in kubelet
var PluginHandler = &RegistrationHandler{}
// ValidatePlugin is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car.
func RegistrationCallback(pluginName string, endpoint string, versions []string, socketPath string) (error, chan bool) {
func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
glog.Infof(log("Trying to register a new plugin with name: %s endpoint: %s versions: %s",
pluginName, endpoint, strings.Join(versions, ",")))
glog.Infof(log("Callback from kubelet with plugin name: %s endpoint: %s versions: %s socket path: %s",
pluginName, endpoint, strings.Join(versions, ","), socketPath))
return nil
}
if endpoint == "" {
endpoint = socketPath
}
// Calling nodeLabelManager to update label for newly registered CSI driver
err := lm.AddLabels(pluginName)
// RegisterPlugin is called when a plugin can be registered
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string) error {
glog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
func() {
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
// It's not necessary to lock the entire RegistrationCallback() function because only the CSI
// client depends on this driver map, and the CSI client does not depend on node information
// updated in the rest of the function.
csiDrivers.Lock()
defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint}
}()
// Get node info from the driver.
csi := newCsiDriverClient(pluginName)
// TODO (verult) retry with exponential backoff, possibly added in csi client library.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
if err != nil {
return err, nil
unregisterDriver(pluginName)
return fmt.Errorf("error during CSI NodeGetInfo() call: %v", err)
}
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
csiDrivers.Lock()
defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint}
return nil, nil
err = nim.AddNodeInfo(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
if err != nil {
unregisterDriver(pluginName)
return fmt.Errorf("error updating CSI node info in the cluster: %v", err)
}
return nil
}
// DeRegisterPlugin is called when a plugin removed it's socket, signaling
// it is no longer available
// TODO: Handle DeRegistration
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
}
func (p *csiPlugin) Init(host volume.VolumeHost) error {
glog.Info(log("plugin initializing..."))
p.host = host
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
csiClient := host.GetCSIClient()
if csiClient == nil {
glog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
} else {
// Start informer for CSIDrivers.
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
p.csiDriverLister = p.csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
}
}
// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient())
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)
return nil
}
@@ -249,6 +311,7 @@ func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.S
glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
fsMode := api.PersistentVolumeFilesystem
pv := &api.PersistentVolume{
ObjectMeta: meta.ObjectMeta{
Name: volData[volDataKey.specVolID],
@@ -260,6 +323,7 @@ func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.S
VolumeHandle: volData[volDataKey.volHandle],
},
},
VolumeMode: &fsMode,
},
}
@@ -268,7 +332,7 @@ func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.S
func (p *csiPlugin) SupportsMountOption() bool {
// TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
// to probe for the result for this method:w
// to probe for the result for this method
return false
}
@@ -279,6 +343,8 @@ func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
// volume.AttachableVolumePlugin methods
var _ volume.AttachableVolumePlugin = &csiPlugin{}
var _ volume.DeviceMountableVolumePlugin = &csiPlugin{}
func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
@@ -293,6 +359,10 @@ func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
}, nil
}
func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
return p.NewAttacher()
}
func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
@@ -307,9 +377,13 @@ func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
}, nil
}
func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
return p.NewDetacher()
}
func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
m := p.host.GetMounter(p.GetPluginName())
return mount.GetMountRefs(m, deviceMountPath)
return m.GetMountRefs(deviceMountPath)
}
// BlockVolumePlugin methods
@@ -346,6 +420,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
driverName: pvSource.Driver,
readOnly: readOnly,
spec: spec,
specName: spec.Name(),
podUID: podRef.UID,
}
@@ -441,3 +516,60 @@ func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapP
return volume.NewSpecFromPersistentVolume(pv, false), nil
}
func (p *csiPlugin) skipAttach(driver string) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
return false, nil
}
if p.csiDriverLister == nil {
return false, errors.New("CSIDriver lister does not exist")
}
csiDriver, err := p.csiDriverLister.Get(driver)
if err != nil {
if apierrs.IsNotFound(err) {
// Don't skip attach if CSIDriver does not exist
return false, nil
}
return false, err
}
if csiDriver.Spec.AttachRequired != nil && *csiDriver.Spec.AttachRequired == false {
return true, nil
}
return false, nil
}
func (p *csiPlugin) getPublishVolumeInfo(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
skip, err := p.skipAttach(driver)
if err != nil {
return nil, err
}
if skip {
return nil, nil
}
attachID := getAttachmentName(handle, driver, nodeName)
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
attachment, err := client.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
return nil, err // This err already has enough context ("VolumeAttachment xyz not found")
}
if attachment == nil {
err = errors.New("no existing VolumeAttachment found")
return nil, err
}
return attachment.Status.AttachmentMetadata, nil
}
func unregisterDriver(driverName string) {
func() {
csiDrivers.Lock()
defer csiDrivers.Unlock()
delete(csiDrivers.driversMap, driverName)
}()
if err := nim.RemoveNodeInfo(driverName); err != nil {
glog.Errorf("Error unregistering CSI driver: %v", err)
}
}

View File

@@ -27,15 +27,18 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
fakeclient "k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
// create a plugin mgr to load plugins and setup a fake client
func newTestPlugin(t *testing.T) (*csiPlugin, string) {
func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecsi.Clientset) (*csiPlugin, string) {
err := utilfeature.DefaultFeatureGate.Set("CSIBlockVolume=true")
if err != nil {
t.Fatalf("Failed to enable feature gate for CSIBlockVolume: %v", err)
@@ -46,11 +49,18 @@ func newTestPlugin(t *testing.T) (*csiPlugin, string) {
t.Fatalf("can't create temp dir: %v", err)
}
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHost(
if client == nil {
client = fakeclient.NewSimpleClientset()
}
if csiClient == nil {
csiClient = fakecsi.NewSimpleClientset()
}
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
client,
csiClient,
nil,
"fakeNode",
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
@@ -65,6 +75,13 @@ func newTestPlugin(t *testing.T) (*csiPlugin, string) {
t.Fatalf("cannot assert plugin to be type csiPlugin")
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
})
}
return csiPlug, tmpDir
}
@@ -92,7 +109,7 @@ func makeTestPV(name string, sizeGig int, driverName, volID string) *api.Persist
}
func TestPluginGetPluginName(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
if plug.GetPluginName() != "kubernetes.io/csi" {
t.Errorf("unexpected plugin name %v", plug.GetPluginName())
@@ -100,7 +117,7 @@ func TestPluginGetPluginName(t *testing.T) {
}
func TestPluginGetVolumeName(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
@@ -129,7 +146,7 @@ func TestPluginGetVolumeName(t *testing.T) {
}
func TestPluginCanSupport(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
@@ -141,7 +158,7 @@ func TestPluginCanSupport(t *testing.T) {
}
func TestPluginConstructVolumeSpec(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
@@ -186,6 +203,14 @@ func TestPluginConstructVolumeSpec(t *testing.T) {
t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle)
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
}
if spec.Name() != tc.specVolID {
t.Errorf("Unexpected spec name %s", spec.Name())
}
@@ -193,7 +218,7 @@ func TestPluginConstructVolumeSpec(t *testing.T) {
}
func TestPluginNewMounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
@@ -241,7 +266,7 @@ func TestPluginNewMounter(t *testing.T) {
}
func TestPluginNewUnmounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
@@ -286,7 +311,7 @@ func TestPluginNewUnmounter(t *testing.T) {
}
func TestPluginNewAttacher(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@@ -304,7 +329,7 @@ func TestPluginNewAttacher(t *testing.T) {
}
func TestPluginNewDetacher(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
detacher, err := plug.NewDetacher()
@@ -322,7 +347,7 @@ func TestPluginNewDetacher(t *testing.T) {
}
func TestPluginNewBlockMapper(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-block-pv", 10, testDriver, testVol)
@@ -367,7 +392,7 @@ func TestPluginNewBlockMapper(t *testing.T) {
}
func TestPluginNewUnmapper(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
@@ -424,7 +449,7 @@ func TestPluginNewUnmapper(t *testing.T) {
}
func TestPluginConstructBlockVolumeSpec(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
@@ -463,6 +488,14 @@ func TestPluginConstructBlockVolumeSpec(t *testing.T) {
continue
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeBlock {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
}
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
if volHandle != tc.data[volDataKey.volHandle] {
t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle)

View File

@@ -28,6 +28,12 @@ import (
"k8s.io/client-go/kubernetes"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"time"
)
const (
testInformerSyncPeriod = 100 * time.Millisecond
testInformerSyncTimeout = 30 * time.Second
)
func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) {
@@ -121,3 +127,16 @@ func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string {
sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(specVolID)
return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "data")
}
// hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce
func hasReadWriteOnce(modes []api.PersistentVolumeAccessMode) bool {
if modes == nil {
return false
}
for _, mode := range modes {
if mode == api.ReadWriteOnce {
return true
}
}
return false
}

View File

@@ -56,19 +56,25 @@ func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts
return nil, nil
}
type CSIVolume struct {
Attributes map[string]string
Path string
}
// NodeClient returns CSI node client
type NodeClient struct {
nodePublishedVolumes map[string]string
nodeStagedVolumes map[string]string
nodePublishedVolumes map[string]CSIVolume
nodeStagedVolumes map[string]CSIVolume
stageUnstageSet bool
nodeGetInfoResp *csipb.NodeGetInfoResponse
nextErr error
}
// NewNodeClient returns fake node client
func NewNodeClient(stageUnstageSet bool) *NodeClient {
return &NodeClient{
nodePublishedVolumes: make(map[string]string),
nodeStagedVolumes: make(map[string]string),
nodePublishedVolumes: make(map[string]CSIVolume),
nodeStagedVolumes: make(map[string]CSIVolume),
stageUnstageSet: stageUnstageSet,
}
}
@@ -78,18 +84,25 @@ func (f *NodeClient) SetNextError(err error) {
f.nextErr = err
}
func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) {
f.nodeGetInfoResp = resp
}
// GetNodePublishedVolumes returns node published volumes
func (f *NodeClient) GetNodePublishedVolumes() map[string]string {
func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
return f.nodePublishedVolumes
}
// GetNodeStagedVolumes returns node staged volumes
func (f *NodeClient) GetNodeStagedVolumes() map[string]string {
func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
return f.nodeStagedVolumes
}
func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string) {
f.nodeStagedVolumes[volID] = deviceMountPath
func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, attributes map[string]string) {
f.nodeStagedVolumes[volID] = CSIVolume{
Path: deviceMountPath,
Attributes: attributes,
}
}
// NodePublishVolume implements CSI NodePublishVolume
@@ -110,7 +123,10 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli
if !strings.Contains(fsTypes, fsType) {
return nil, errors.New("invalid fstype")
}
f.nodePublishedVolumes[req.GetVolumeId()] = req.GetTargetPath()
f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{
Path: req.GetTargetPath(),
Attributes: req.GetVolumeAttributes(),
}
return &csipb.NodePublishVolumeResponse{}, nil
}
@@ -153,7 +169,10 @@ func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVo
return nil, errors.New("invalid fstype")
}
f.nodeStagedVolumes[req.GetVolumeId()] = req.GetStagingTargetPath()
f.nodeStagedVolumes[req.GetVolumeId()] = CSIVolume{
Path: req.GetStagingTargetPath(),
Attributes: req.GetVolumeAttributes(),
}
return &csipb.NodeStageVolumeResponse{}, nil
}
@@ -179,6 +198,14 @@ func (f *NodeClient) NodeGetId(ctx context.Context, in *csipb.NodeGetIdRequest,
return nil, nil
}
// NodeGetId implements csi method
func (f *NodeClient) NodeGetInfo(ctx context.Context, in *csipb.NodeGetInfoRequest, opts ...grpc.CallOption) (*csipb.NodeGetInfoResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
return f.nodeGetInfoResp, nil
}
// NodeGetCapabilities implements csi method
func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
resp := &csipb.NodeGetCapabilitiesResponse{

View File

@@ -1,30 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["labelmanager.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -1,251 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package labelmanager includes internal functions used to add/delete labels to
// kubernetes nodes for corresponding CSI drivers
package labelmanager // import "k8s.io/kubernetes/pkg/volume/csi/labelmanager"
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
)
const (
// Name of node annotation that contains JSON map of driver names to node
// names
annotationKey = "csi.volume.kubernetes.io/nodeid"
csiPluginName = "kubernetes.io/csi"
)
// labelManagementStruct is struct of channels used for communication between the driver registration
// code and the go routine responsible for managing the node's labels
type labelManagerStruct struct {
nodeName types.NodeName
k8s kubernetes.Interface
}
// Interface implements an interface for managing labels of a node
type Interface interface {
AddLabels(driverName string) error
}
// NewLabelManager initializes labelManagerStruct and returns available interfaces
func NewLabelManager(nodeName types.NodeName, kubeClient kubernetes.Interface) Interface {
return labelManagerStruct{
nodeName: nodeName,
k8s: kubeClient,
}
}
// nodeLabelManager waits for labeling requests initiated by the driver's registration
// process.
func (lm labelManagerStruct) AddLabels(driverName string) error {
err := verifyAndAddNodeId(string(lm.nodeName), lm.k8s.CoreV1().Nodes(), driverName, string(lm.nodeName))
if err != nil {
return fmt.Errorf("failed to update node %s's annotation with error: %+v", lm.nodeName, err)
}
return nil
}
// Clones the given map and returns a new map with the given key and value added.
// Returns the given map, if annotationKey is empty.
func cloneAndAddAnnotation(
annotations map[string]string,
annotationKey,
annotationValue string) map[string]string {
if annotationKey == "" {
// Don't need to add an annotation.
return annotations
}
// Clone.
newAnnotations := map[string]string{}
for key, value := range annotations {
newAnnotations[key] = value
}
newAnnotations[annotationKey] = annotationValue
return newAnnotations
}
func verifyAndAddNodeId(
k8sNodeName string,
k8sNodesClient corev1.NodeInterface,
csiDriverName string,
csiDriverNodeId string) error {
// Add or update annotation on Node object
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("Failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}
var previousAnnotationValue string
if result.ObjectMeta.Annotations != nil {
previousAnnotationValue =
result.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}
existingDriverMap := map[string]string{}
if previousAnnotationValue != "" {
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}
}
if val, ok := existingDriverMap[csiDriverName]; ok {
if val == csiDriverNodeId {
// Value already exists in node annotation, nothing more to do
glog.V(1).Infof(
"The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v",
csiDriverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
return nil
}
}
// Add/update annotation value
existingDriverMap[csiDriverName] = csiDriverNodeId
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return fmt.Errorf(
"failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v",
csiDriverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
}
result.ObjectMeta.Annotations = cloneAndAddAnnotation(
result.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
_, updateErr := k8sNodesClient.Update(result)
if updateErr == nil {
fmt.Printf(
"Updated node %q successfully for CSI driver %q and CSI node name %q",
k8sNodeName,
csiDriverName,
csiDriverNodeId)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("node update failed: %v", retryErr)
}
return nil
}
// Fetches Kubernetes node API object corresponding to k8sNodeName.
// If the csiDriverName is present in the node annotation, it is removed.
func verifyAndDeleteNodeId(
k8sNodeName string,
k8sNodesClient corev1.NodeInterface,
csiDriverName string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}
var previousAnnotationValue string
if result.ObjectMeta.Annotations != nil {
previousAnnotationValue =
result.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}
existingDriverMap := map[string]string{}
if previousAnnotationValue == "" {
// Value already exists in node annotation, nothing more to do
glog.V(1).Infof(
"The key %q does not exist in node %q annotation, no need to cleanup.",
csiDriverName,
annotationKey)
return nil
}
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}
if _, ok := existingDriverMap[csiDriverName]; !ok {
// Value already exists in node annotation, nothing more to do
glog.V(1).Infof(
"The key %q does not eixst in node %q annotation, no need to cleanup: %v",
csiDriverName,
annotationKey,
previousAnnotationValue)
return nil
}
// Add/update annotation value
delete(existingDriverMap, csiDriverName)
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return fmt.Errorf(
"failed while trying to remove key %q from node %q annotation. Existing data: %v",
csiDriverName,
annotationKey,
previousAnnotationValue)
}
result.ObjectMeta.Annotations = cloneAndAddAnnotation(
result.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
_, updateErr := k8sNodesClient.Update(result)
if updateErr == nil {
fmt.Printf(
"Updated node %q annotation to remove CSI driver %q.",
k8sNodeName,
csiDriverName)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("node update failed: %v", retryErr)
}
return nil
}

View File

@@ -0,0 +1,64 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["nodeinfomanager.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["nodeinfomanager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/core/helper:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
],
)

View File

@@ -0,0 +1,543 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package nodeinfomanager includes internal functions used to add/delete labels to
// kubernetes nodes for corresponding CSI drivers
package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
import (
"encoding/json"
"fmt"
csipb "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/retry"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
const (
// Name of node annotation that contains JSON map of driver names to node
annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid"
)
var nodeKind = v1.SchemeGroupVersion.WithKind("Node")
// nodeInfoManager contains necessary common dependencies to update node info on both
// the Node and CSINodeInfo objects.
type nodeInfoManager struct {
nodeName types.NodeName
volumeHost volume.VolumeHost
}
// If no updates is needed, the function must return the same Node object as the input.
type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
// Interface implements an interface for managing labels of a node
type Interface interface {
// Record in the cluster the given node information from the CSI driver with the given name.
// Concurrent calls to AddNodeInfo() is allowed, but they should not be intertwined with calls
// to other methods in this interface.
AddNodeInfo(driverName string, driverNodeID string, maxVolumeLimit int64, topology *csipb.Topology) error
// Remove in the cluster node information from the CSI driver with the given name.
// Concurrent calls to RemoveNodeInfo() is allowed, but they should not be intertwined with calls
// to other methods in this interface.
RemoveNodeInfo(driverName string) error
}
// NewNodeInfoManager initializes nodeInfoManager
func NewNodeInfoManager(
nodeName types.NodeName,
volumeHost volume.VolumeHost) Interface {
return &nodeInfoManager{
nodeName: nodeName,
volumeHost: volumeHost,
}
}
// AddNodeInfo updates the node ID annotation in the Node object and CSIDrivers field in the
// CSINodeInfo object. If the CSINodeInfo object doesn't yet exist, it will be created.
// If multiple calls to AddNodeInfo() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) AddNodeInfo(driverName string, driverNodeID string, maxAttachLimit int64, topology *csipb.Topology) error {
if driverNodeID == "" {
return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
}
nodeUpdateFuncs := []nodeUpdateFunc{
updateNodeIDInNode(driverName, driverNodeID),
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
nodeUpdateFuncs = append(nodeUpdateFuncs, updateTopologyLabels(topology))
}
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
nodeUpdateFuncs = append(nodeUpdateFuncs, updateMaxAttachLimit(driverName, maxAttachLimit))
}
err := nim.updateNode(nodeUpdateFuncs...)
if err != nil {
return fmt.Errorf("error updating Node object with CSI driver node info: %v", err)
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
err = nim.updateCSINodeInfo(driverName, driverNodeID, topology)
if err != nil {
return fmt.Errorf("error updating CSINodeInfo object with CSI driver node info: %v", err)
}
}
return nil
}
// RemoveNodeInfo removes the node ID annotation from the Node object and CSIDrivers field from the
// CSINodeInfo object. If the CSINOdeInfo object contains no CSIDrivers, it will be deleted.
// If multiple calls to RemoveNodeInfo() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) RemoveNodeInfo(driverName string) error {
err := nim.removeCSINodeInfo(driverName)
if err != nil {
return fmt.Errorf("error removing CSI driver node info from CSINodeInfo object %v", err)
}
err = nim.updateNode(
removeMaxAttachLimit(driverName),
removeNodeIDFromNode(driverName),
)
if err != nil {
return fmt.Errorf("error removing CSI driver node info from Node object %v", err)
}
return nil
}
// updateNode repeatedly attempts to update the corresponding node object
// which is modified by applying the given update functions sequentially.
// Because updateFuncs are applied sequentially, later updateFuncs should take into account
// the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple
// functions update the same field, updates in the last function are persisted.
func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("error getting kube client")
}
nodeClient := kubeClient.CoreV1().Nodes()
node, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
}
needUpdate := false
for _, update := range updateFuncs {
newNode, updated, err := update(node)
if err != nil {
return err
}
node = newNode
needUpdate = needUpdate || updated
}
if needUpdate {
_, updateErr := nodeClient.Update(node)
return updateErr // do not wrap error
}
return nil
})
if retryErr != nil {
return fmt.Errorf("node update failed: %v", retryErr)
}
return nil
}
// Guarantees the map is non-nil if no error is returned.
func buildNodeIDMapFromAnnotation(node *v1.Node) (map[string]string, error) {
var previousAnnotationValue string
if node.ObjectMeta.Annotations != nil {
previousAnnotationValue =
node.ObjectMeta.Annotations[annotationKeyNodeID]
}
var existingDriverMap map[string]string
if previousAnnotationValue != "" {
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return nil, fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKeyNodeID,
previousAnnotationValue,
err)
}
}
if existingDriverMap == nil {
return make(map[string]string), nil
}
return existingDriverMap, nil
}
// updateNodeIDInNode returns a function that updates a Node object with the given
// Node ID information.
func updateNodeIDInNode(
csiDriverName string,
csiDriverNodeID string) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
existingDriverMap, err := buildNodeIDMapFromAnnotation(node)
if err != nil {
return nil, false, err
}
if val, ok := existingDriverMap[csiDriverName]; ok {
if val == csiDriverNodeID {
// Value already exists in node annotation, nothing more to do
return node, false, nil
}
}
// Add/update annotation value
existingDriverMap[csiDriverName] = csiDriverNodeID
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return nil, false, fmt.Errorf(
"error while marshalling node ID map updated with driverName=%q, nodeID=%q: %v",
csiDriverName,
csiDriverNodeID,
err)
}
if node.ObjectMeta.Annotations == nil {
node.ObjectMeta.Annotations = make(map[string]string)
}
node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
return node, true, nil
}
}
// removeNodeIDFromNode returns a function that removes node ID information matching the given
// driver name from a Node object.
func removeNodeIDFromNode(csiDriverName string) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
var previousAnnotationValue string
if node.ObjectMeta.Annotations != nil {
previousAnnotationValue =
node.ObjectMeta.Annotations[annotationKeyNodeID]
}
if previousAnnotationValue == "" {
return node, false, nil
}
// Parse previousAnnotationValue as JSON
existingDriverMap := map[string]string{}
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return nil, false, fmt.Errorf(
"failed to parse node's %q annotation value (%q) err=%v",
annotationKeyNodeID,
previousAnnotationValue,
err)
}
if _, ok := existingDriverMap[csiDriverName]; !ok {
// Value is already missing in node annotation, nothing more to do
return node, false, nil
}
// Delete annotation value
delete(existingDriverMap, csiDriverName)
if len(existingDriverMap) == 0 {
delete(node.ObjectMeta.Annotations, annotationKeyNodeID)
} else {
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return nil, false, fmt.Errorf(
"failed while trying to remove key %q from node %q annotation. Existing data: %v",
csiDriverName,
annotationKeyNodeID,
previousAnnotationValue)
}
node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
}
return node, true, nil
}
}
// updateTopologyLabels returns a function that updates labels of a Node object with the given
// topology information.
func updateTopologyLabels(topology *csipb.Topology) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
if topology == nil || len(topology.Segments) == 0 {
return node, false, nil
}
for k, v := range topology.Segments {
if curVal, exists := node.Labels[k]; exists && curVal != v {
return nil, false, fmt.Errorf("detected topology value collision: driver reported %q:%q but existing label is %q:%q", k, v, k, curVal)
}
}
if node.Labels == nil {
node.Labels = make(map[string]string)
}
for k, v := range topology.Segments {
node.Labels[k] = v
}
return node, true, nil
}
}
func (nim *nodeInfoManager) updateCSINodeInfo(
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
return nim.createNodeInfoObject(driverName, driverNodeID, topology)
}
if err != nil {
return err // do not wrap error
}
return nim.updateNodeInfoObject(nodeInfo, driverName, driverNodeID, topology)
})
if retryErr != nil {
return fmt.Errorf("CSINodeInfo update failed: %v", retryErr)
}
return nil
}
func (nim *nodeInfoManager) createNodeInfoObject(
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("error getting kube client")
}
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
topologyKeys := []string{} // must be an empty slice instead of nil to satisfy CRD OpenAPI Schema validation
if topology != nil {
for k := range topology.Segments {
topologyKeys = append(topologyKeys, k)
}
}
node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
}
nodeInfo := &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: string(nim.nodeName),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: nodeKind.Version,
Kind: nodeKind.Kind,
Name: node.Name,
UID: node.UID,
},
},
},
CSIDrivers: []csiv1alpha1.CSIDriverInfo{
{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys,
},
},
}
_, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
return err // do not wrap error
}
func (nim *nodeInfoManager) updateNodeInfoObject(
nodeInfo *csiv1alpha1.CSINodeInfo,
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
topologyKeys := make(sets.String)
if topology != nil {
for k := range topology.Segments {
topologyKeys.Insert(k)
}
}
// Clone driver list, omitting the driver that matches the given driverName,
// unless the driver is identical to information provided, in which case no update is necessary.
var newDriverInfos []csiv1alpha1.CSIDriverInfo
for _, driverInfo := range nodeInfo.CSIDrivers {
if driverInfo.Driver == driverName {
prevTopologyKeys := sets.NewString(driverInfo.TopologyKeys...)
if driverInfo.NodeID == driverNodeID && prevTopologyKeys.Equal(topologyKeys) {
// No update needed
return nil
}
} else {
// Omit driverInfo matching given driverName
newDriverInfos = append(newDriverInfos, driverInfo)
}
}
// Append new driver
driverInfo := csiv1alpha1.CSIDriverInfo{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
}
newDriverInfos = append(newDriverInfos, driverInfo)
nodeInfo.CSIDrivers = newDriverInfos
_, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
return err // do not wrap error
}
func (nim *nodeInfoManager) removeCSINodeInfo(csiDriverName string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos()
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
// do nothing
return nil
}
if err != nil {
return err // do not wrap error
}
// Remove matching driver from driver list
var newDriverInfos []csiv1alpha1.CSIDriverInfo
for _, driverInfo := range nodeInfo.CSIDrivers {
if driverInfo.Driver != csiDriverName {
newDriverInfos = append(newDriverInfos, driverInfo)
}
}
if len(newDriverInfos) == len(nodeInfo.CSIDrivers) {
// No changes, don't update
return nil
}
if len(newDriverInfos) == 0 {
// No drivers left, delete CSINodeInfo object
return nodeInfoClient.Delete(string(nim.nodeName), &metav1.DeleteOptions{})
}
// TODO (verult) make sure CSINodeInfo has validation logic to prevent duplicate driver names
_, updateErr := nodeInfoClient.Update(nodeInfo)
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("CSINodeInfo update failed: %v", retryErr)
}
return nil
}
func updateMaxAttachLimit(driverName string, maxLimit int64) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
if maxLimit <= 0 {
glog.V(4).Infof("skipping adding attach limit for %s", driverName)
return node, false, nil
}
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
limitKeyName := util.GetCSIAttachLimitKey(driverName)
node.Status.Capacity[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKeyName)] = *resource.NewQuantity(maxLimit, resource.DecimalSI)
return node, true, nil
}
}
func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) {
limitKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))
capacityExists := false
if node.Status.Capacity != nil {
_, capacityExists = node.Status.Capacity[limitKey]
}
allocatableExists := false
if node.Status.Allocatable != nil {
_, allocatableExists = node.Status.Allocatable[limitKey]
}
if !capacityExists && !allocatableExists {
return node, false, nil
}
delete(node.Status.Capacity, limitKey)
if len(node.Status.Capacity) == 0 {
node.Status.Capacity = nil
}
delete(node.Status.Allocatable, limitKey)
if len(node.Status.Allocatable) == 0 {
node.Status.Allocatable = nil
}
return node, true, nil
}
}

View File

@@ -0,0 +1,809 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeinfomanager
import (
"encoding/json"
"testing"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csifake "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
)
type testcase struct {
name string
driverName string
existingNode *v1.Node
existingNodeInfo *csiv1alpha1.CSINodeInfo
inputNodeID string
inputTopology *csi.Topology
inputVolumeLimit int64
expectedNodeIDMap map[string]string
expectedTopologyMap map[string]sets.String
expectedLabels map[string]string
expectNoNodeInfo bool
expectedVolumeLimit int64
expectFail bool
}
type nodeIDMap map[string]string
type topologyKeyMap map[string][]string
type labelMap map[string]string
// TestAddNodeInfo tests AddNodeInfo with various existing Node and/or CSINodeInfo objects.
// The node IDs in all test cases below are the same between the Node annotation and CSINodeInfo.
func TestAddNodeInfo(t *testing.T) {
testcases := []testcase{
{
name: "empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */, nil /*capacity*/),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
},
expectedLabels: map[string]string{"com.example.csi/zone": "zoneA"},
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
},
nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
{
name: "pre-existing node info from the same driver, but without topology info",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil, /* topologyKeys */
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
labelMap{
"net.example.storage/rack": "rack1",
}, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
topologyKeyMap{
"net.example.storage/other-driver": {"net.example.storage/rack"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "zoneA",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
"net.example.storage/other-driver": "net.example.storage/test-node",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/zone"),
"net.example.storage/other-driver": sets.NewString("net.example.storage/rack"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
"net.example.storage/rack": "rack1",
},
},
{
name: "pre-existing node info from the same driver, but different node ID and topology values; labels should conflict",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/zone": "other-zone",
},
},
expectFail: true,
},
{
name: "pre-existing node info from the same driver, but different node ID and topology keys; new labels should be added",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/other-node",
inputTopology: &csi.Topology{
Segments: map[string]string{
"com.example.csi/rack": "rack1",
},
},
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/other-node",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": sets.NewString("com.example.csi/rack"),
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA",
"com.example.csi/rack": "rack1",
},
},
{
name: "nil topology, empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */, nil /*capacity*/),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: nil,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": nil,
},
expectedLabels: nil,
},
{
name: "nil topology, pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: nil,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": nil,
},
expectedLabels: map[string]string{
"com.example.csi/zone": "zoneA", // old labels are not removed
},
},
{
name: "nil topology, pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
labelMap{
"net.example.storage/rack": "rack1",
}, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
topologyKeyMap{
"net.example.storage/other-driver": {"net.example.storage/rack"},
},
),
inputNodeID: "com.example.csi/csi-node1",
inputTopology: nil,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
"net.example.storage/other-driver": "net.example.storage/test-node",
},
expectedTopologyMap: map[string]sets.String{
"net.example.storage/other-driver": sets.NewString("net.example.storage/rack"),
"com.example.csi/driver1": nil,
},
expectedLabels: map[string]string{
"net.example.storage/rack": "rack1",
},
},
{
name: "empty node ID",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */, nil /*capacity*/),
inputNodeID: "",
expectFail: true,
},
{
name: "new node with valid max limit",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /*nodeIDs*/, nil /*labels*/, nil /*capacity*/),
inputVolumeLimit: 10,
inputTopology: nil,
inputNodeID: "com.example.csi/csi-node1",
expectedVolumeLimit: 10,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": nil,
},
expectedLabels: nil,
},
{
name: "node with existing valid max limit",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nil, /*nodeIDs*/
nil, /*labels*/
map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: *resource.NewScaledQuantity(4, -3),
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi/driver1")): *resource.NewQuantity(10, resource.DecimalSI),
}),
inputVolumeLimit: 20,
inputTopology: nil,
inputNodeID: "com.example.csi/csi-node1",
expectedVolumeLimit: 20,
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"com.example.csi/driver1": nil,
},
expectedLabels: nil,
},
}
test(t, true /* addNodeInfo */, true /* csiNodeInfoEnabled */, testcases)
}
// TestAddNodeInfo_CSINodeInfoDisabled tests AddNodeInfo with various existing Node annotations
// and CSINodeInfo feature gate disabled.
func TestAddNodeInfo_CSINodeInfoDisabled(t *testing.T) {
testcases := []testcase{
{
name: "empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */, nil /*capacity*/),
inputNodeID: "com.example.csi/csi-node1",
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */, nil /*capacity*/),
inputNodeID: "com.example.csi/csi-node1",
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
nil /* labels */, nil /*capacity*/),
inputNodeID: "com.example.csi/csi-node1",
expectedNodeIDMap: map[string]string{
"com.example.csi/driver1": "com.example.csi/csi-node1",
"net.example.storage/other-driver": "net.example.storage/test-node",
},
},
}
test(t, true /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases)
}
// TestRemoveNodeInfo tests RemoveNodeInfo with various existing Node and/or CSINodeInfo objects.
func TestRemoveNodeInfo(t *testing.T) {
testcases := []testcase{
{
name: "empty node and no CSINodeInfo",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */, nil /*capacity*/),
expectedNodeIDMap: nil,
expectedLabels: nil,
expectNoNodeInfo: true,
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
labelMap{
"com.example.csi/zone": "zoneA",
}, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
topologyKeyMap{
"com.example.csi/driver1": {"com.example.csi/zone"},
},
),
expectedNodeIDMap: nil,
expectedLabels: map[string]string{"com.example.csi/zone": "zoneA"},
expectNoNodeInfo: true,
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
labelMap{
"net.example.storage/zone": "zoneA",
}, nil /*capacity*/),
existingNodeInfo: generateNodeInfo(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
topologyKeyMap{
"net.example.storage/other-driver": {"net.example.storage/zone"},
},
),
expectedNodeIDMap: map[string]string{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
expectedTopologyMap: map[string]sets.String{
"net.example.storage/other-driver": sets.NewString("net.example.storage/zone"),
},
expectedLabels: map[string]string{"net.example.storage/zone": "zoneA"},
},
{
name: "pre-existing info about the same driver in node, but no CSINodeInfo",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */, nil /*capacity*/),
expectedNodeIDMap: nil,
expectedLabels: nil,
expectNoNodeInfo: true,
},
{
name: "pre-existing info about a different driver in node, but no CSINodeInfo",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
nil /* labels */, nil /*capacity*/),
expectedNodeIDMap: map[string]string{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
expectedLabels: nil,
expectNoNodeInfo: true,
},
{
name: "new node with valid max limit",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nil, /*nodeIDs*/
nil, /*labels*/
map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: *resource.NewScaledQuantity(4, -3),
v1.ResourceName(util.GetCSIAttachLimitKey("com.example.csi/driver1")): *resource.NewQuantity(10, resource.DecimalSI),
},
),
inputTopology: nil,
inputNodeID: "com.example.csi/csi-node1",
expectNoNodeInfo: true,
expectedVolumeLimit: 0,
},
}
test(t, false /* addNodeInfo */, true /* csiNodeInfoEnabled */, testcases)
}
// TestRemoveNodeInfo tests RemoveNodeInfo with various existing Node objects and CSINodeInfo
// feature disabled.
func TestRemoveNodeInfo_CSINodeInfoDisabled(t *testing.T) {
testcases := []testcase{
{
name: "empty node",
driverName: "com.example.csi/driver1",
existingNode: generateNode(nil /* nodeIDs */, nil /* labels */, nil /*capacity*/),
expectedNodeIDMap: nil,
},
{
name: "pre-existing node info from the same driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */, nil /*capacity*/),
expectedNodeIDMap: nil,
},
{
name: "pre-existing node info from different driver",
driverName: "com.example.csi/driver1",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
nil /* labels */, nil /*capacity*/),
expectedNodeIDMap: map[string]string{
"net.example.storage/other-driver": "net.example.storage/csi-node1",
},
},
}
test(t, false /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases)
}
func TestAddNodeInfoExistingAnnotation(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, true)()
driverName := "com.example.csi/driver1"
nodeID := "com.example.csi/some-node"
testcases := []struct {
name string
existingNode *v1.Node
}{
{
name: "pre-existing info about the same driver in node, but no CSINodeInfo",
existingNode: generateNode(
nodeIDMap{
"com.example.csi/driver1": "com.example.csi/csi-node1",
},
nil /* labels */, nil /*capacity*/),
},
{
name: "pre-existing info about a different driver in node, but no CSINodeInfo",
existingNode: generateNode(
nodeIDMap{
"net.example.storage/other-driver": "net.example.storage/test-node",
},
nil /* labels */, nil /*capacity*/),
},
}
for _, tc := range testcases {
t.Logf("test case: %q", tc.name)
// Arrange
nodeName := tc.existingNode.Name
client := fake.NewSimpleClientset(tc.existingNode)
csiClient := csifake.NewSimpleClientset()
tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
client,
csiClient,
nil,
nodeName,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
// Act
err = nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit
if err != nil {
t.Errorf("expected no error from AddNodeInfo call but got: %v", err)
continue
}
// Assert
nodeInfo, err := csiClient.Csi().CSINodeInfos().Get(nodeName, metav1.GetOptions{})
if err != nil {
t.Errorf("error getting CSINodeInfo: %v", err)
continue
}
if len(nodeInfo.CSIDrivers) != 1 {
t.Errorf("expected 1 CSIDriverInfo entry but got: %d", len(nodeInfo.CSIDrivers))
continue
}
driver := nodeInfo.CSIDrivers[0]
if driver.Driver != driverName || driver.NodeID != nodeID {
t.Errorf("expected Driver to be %q and NodeID to be %q, but got: %q:%q", driverName, nodeID, driver.Driver, driver.NodeID)
}
}
}
func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []testcase) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, csiNodeInfoEnabled)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AttachVolumeLimit, true)()
for _, tc := range testcases {
t.Logf("test case: %q", tc.name)
//// Arrange
nodeName := tc.existingNode.Name
client := fake.NewSimpleClientset(tc.existingNode)
var csiClient *csifake.Clientset
if tc.existingNodeInfo == nil {
csiClient = csifake.NewSimpleClientset()
} else {
csiClient = csifake.NewSimpleClientset(tc.existingNodeInfo)
}
tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
client,
csiClient,
nil,
nodeName,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
//// Act
if addNodeInfo {
err = nim.AddNodeInfo(tc.driverName, tc.inputNodeID, tc.inputVolumeLimit, tc.inputTopology)
} else {
err = nim.RemoveNodeInfo(tc.driverName)
}
//// Assert
if tc.expectFail {
if err == nil {
t.Errorf("expected an error from AddNodeInfo call but got none")
}
continue
} else if err != nil {
t.Errorf("expected no error from AddNodeInfo call but got: %v", err)
continue
}
/* Node Validation */
node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
t.Errorf("error getting node: %v", err)
continue
}
// We are testing max volume limits
attachLimit := getVolumeLimit(node, tc.driverName)
if attachLimit != tc.expectedVolumeLimit {
t.Errorf("expected volume limit to be %d got %d", tc.expectedVolumeLimit, attachLimit)
continue
}
// Node ID annotation
annNodeID, ok := node.Annotations[annotationKeyNodeID]
if ok {
if tc.expectedNodeIDMap == nil {
t.Errorf("expected annotation %q to not exist, but got: %q", annotationKeyNodeID, annNodeID)
} else {
var actualNodeIDs map[string]string
err = json.Unmarshal([]byte(annNodeID), &actualNodeIDs)
if err != nil {
t.Errorf("expected no error when parsing annotation %q, but got error: %v", annotationKeyNodeID, err)
}
if !helper.Semantic.DeepEqual(actualNodeIDs, tc.expectedNodeIDMap) {
t.Errorf("expected annotation %v; got: %v", tc.expectedNodeIDMap, actualNodeIDs)
}
}
} else {
if tc.expectedNodeIDMap != nil {
t.Errorf("expected annotation %q, but got none", annotationKeyNodeID)
}
}
if csiNodeInfoEnabled {
// Topology labels
if !helper.Semantic.DeepEqual(node.Labels, tc.expectedLabels) {
t.Errorf("expected topology labels to be %v; got: %v", tc.expectedLabels, node.Labels)
}
/* CSINodeInfo validation */
nodeInfo, err := csiClient.Csi().CSINodeInfos().Get(nodeName, metav1.GetOptions{})
if tc.expectNoNodeInfo && errors.IsNotFound(err) {
continue
} else if err != nil {
t.Errorf("error getting CSINodeInfo: %v", err)
continue
}
// Extract node IDs and topology keys
actualNodeIDs := make(map[string]string)
actualTopologyKeys := make(map[string]sets.String)
for _, driver := range nodeInfo.CSIDrivers {
actualNodeIDs[driver.Driver] = driver.NodeID
actualTopologyKeys[driver.Driver] = sets.NewString(driver.TopologyKeys...)
}
// Node IDs
if !helper.Semantic.DeepEqual(actualNodeIDs, tc.expectedNodeIDMap) {
t.Errorf("expected node IDs %v from CSINodeInfo; got: %v", tc.expectedNodeIDMap, actualNodeIDs)
}
// Topology keys
if !helper.Semantic.DeepEqual(actualTopologyKeys, tc.expectedTopologyMap) {
t.Errorf("expected topology keys %v from CSINodeInfo; got: %v", tc.expectedTopologyMap, actualTopologyKeys)
}
}
}
}
func getVolumeLimit(node *v1.Node, driverName string) int64 {
volumeLimits := map[v1.ResourceName]int64{}
nodeAllocatables := node.Status.Allocatable
for k, v := range nodeAllocatables {
if v1helper.IsAttachableVolumeResourceName(k) {
volumeLimits[k] = v.Value()
}
}
attachKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))
attachLimit := volumeLimits[attachKey]
return attachLimit
}
func generateNode(nodeIDs, labels map[string]string, capacity map[v1.ResourceName]resource.Quantity) *v1.Node {
var annotations map[string]string
if len(nodeIDs) > 0 {
b, _ := json.Marshal(nodeIDs)
annotations = map[string]string{annotationKeyNodeID: string(b)}
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: annotations,
Labels: labels,
},
}
if len(capacity) > 0 {
node.Status.Capacity = v1.ResourceList(capacity)
node.Status.Allocatable = v1.ResourceList(capacity)
}
return node
}
func generateNodeInfo(nodeIDs map[string]string, topologyKeys map[string][]string) *csiv1alpha1.CSINodeInfo {
var drivers []csiv1alpha1.CSIDriverInfo
for k, nodeID := range nodeIDs {
d := csiv1alpha1.CSIDriverInfo{
Driver: k,
NodeID: nodeID,
}
if top, exists := topologyKeys[k]; exists {
d.TopologyKeys = top
}
drivers = append(drivers, d)
}
return &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
CSIDrivers: drivers,
}
}