feat: Implement distributed snapshotting
This commit is contained in:
@@ -838,10 +838,12 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
|
||||
informerFactory.Snapshot().V1().VolumeSnapshotContents(),
|
||||
informerFactory.Snapshot().V1().VolumeSnapshotClasses(),
|
||||
coreFactory.Core().V1().PersistentVolumeClaims(),
|
||||
nil,
|
||||
metricsManager,
|
||||
60*time.Second,
|
||||
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
|
||||
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
|
||||
false,
|
||||
)
|
||||
|
||||
ctrl.eventRecorder = record.NewFakeRecorder(1000)
|
||||
|
@@ -29,6 +29,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
||||
klog "k8s.io/klog/v2"
|
||||
|
||||
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
|
||||
@@ -671,6 +672,18 @@ func (ctrl *csiSnapshotCommonController) createSnapshotContent(snapshot *crdv1.V
|
||||
},
|
||||
}
|
||||
|
||||
if ctrl.enableDistributedSnapshotting {
|
||||
nodeName, err := ctrl.getManagedByNode(volume)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if nodeName != "" {
|
||||
snapshotContent.Labels = map[string]string{
|
||||
utils.VolumeSnapshotContentManagedByLabel: nodeName,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set AnnDeletionSecretRefName and AnnDeletionSecretRefNamespace
|
||||
if snapshotterSecretRef != nil {
|
||||
klog.V(5).Infof("createSnapshotContent: set annotation [%s] on content [%s].", utils.AnnDeletionSecretRefName, snapshotContent.Name)
|
||||
@@ -1655,3 +1668,27 @@ func (ctrl *csiSnapshotCommonController) checkAndSetInvalidSnapshotLabel(snapsho
|
||||
|
||||
return updatedSnapshot, nil
|
||||
}
|
||||
|
||||
func (ctrl *csiSnapshotCommonController) getManagedByNode(pv *v1.PersistentVolume) (string, error) {
|
||||
if pv.Spec.NodeAffinity == nil {
|
||||
klog.V(5).Infof("NodeAffinity not set for pv %s", pv.Name)
|
||||
return "", nil
|
||||
}
|
||||
nodeSelectorTerms := pv.Spec.NodeAffinity.Required
|
||||
|
||||
nodes, err := ctrl.nodeLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get the list of nodes: %q", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
match, _ := corev1helpers.MatchNodeSelectorTerms(node, nodeSelectorTerms)
|
||||
if match {
|
||||
return node.Name, nil
|
||||
}
|
||||
}
|
||||
|
||||
klog.Errorf("failed to find nodes that match the node affinity requirements for pv[%s]", pv.Name)
|
||||
return "", nil
|
||||
}
|
||||
|
@@ -57,6 +57,8 @@ type csiSnapshotCommonController struct {
|
||||
classListerSynced cache.InformerSynced
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvcListerSynced cache.InformerSynced
|
||||
nodeLister corelisters.NodeLister
|
||||
nodeListerSynced cache.InformerSynced
|
||||
|
||||
snapshotStore cache.Store
|
||||
contentStore cache.Store
|
||||
@@ -64,6 +66,8 @@ type csiSnapshotCommonController struct {
|
||||
metricsManager metrics.MetricsManager
|
||||
|
||||
resyncPeriod time.Duration
|
||||
|
||||
enableDistributedSnapshotting bool
|
||||
}
|
||||
|
||||
// NewCSISnapshotController returns a new *csiSnapshotCommonController
|
||||
@@ -74,10 +78,12 @@ func NewCSISnapshotCommonController(
|
||||
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
|
||||
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
metricsManager metrics.MetricsManager,
|
||||
resyncPeriod time.Duration,
|
||||
snapshotRateLimiter workqueue.RateLimiter,
|
||||
contentRateLimiter workqueue.RateLimiter,
|
||||
enableDistributedSnapshotting bool,
|
||||
) *csiSnapshotCommonController {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartLogging(klog.Infof)
|
||||
@@ -125,6 +131,13 @@ func NewCSISnapshotCommonController(
|
||||
ctrl.classLister = volumeSnapshotClassInformer.Lister()
|
||||
ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced
|
||||
|
||||
ctrl.enableDistributedSnapshotting = enableDistributedSnapshotting
|
||||
|
||||
if enableDistributedSnapshotting {
|
||||
ctrl.nodeLister = nodeInformer.Lister()
|
||||
ctrl.nodeListerSynced = nodeInformer.Informer().HasSynced
|
||||
}
|
||||
|
||||
return ctrl
|
||||
}
|
||||
|
||||
@@ -135,7 +148,12 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
|
||||
klog.Infof("Starting snapshot controller")
|
||||
defer klog.Infof("Shutting snapshot controller")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) {
|
||||
informersSynced := []cache.InformerSynced{ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced}
|
||||
if ctrl.enableDistributedSnapshotting {
|
||||
informersSynced = append(informersSynced, ctrl.nodeListerSynced)
|
||||
}
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, informersSynced...) {
|
||||
klog.Errorf("Cannot sync caches")
|
||||
return
|
||||
}
|
||||
|
@@ -21,11 +21,28 @@ import (
|
||||
|
||||
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
|
||||
"github.com/kubernetes-csi/external-snapshotter/v4/pkg/utils"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
var deletionPolicy = crdv1.VolumeSnapshotContentDelete
|
||||
|
||||
type FakeNodeLister struct {
|
||||
NodeList []*v1.Node
|
||||
}
|
||||
|
||||
// List lists all Nodes in the indexer.
|
||||
// Objects returned here must be treated as read-only.
|
||||
func (l FakeNodeLister) List(selector labels.Selector) (ret []*v1.Node, err error) {
|
||||
return l.NodeList, nil
|
||||
}
|
||||
|
||||
func (l FakeNodeLister) Get(name string) (*v1.Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func storeVersion(t *testing.T, prefix string, c cache.Store, version string, expectedReturn bool) {
|
||||
content := newContent("contentName", "snapuid1-1", "snap1-1", "sid1-1", classGold, "", "pv-handle-1-1", deletionPolicy, nil, nil, false, true)
|
||||
content.ResourceVersion = version
|
||||
@@ -92,3 +109,71 @@ func TestControllerCacheParsingError(t *testing.T) {
|
||||
t.Errorf("Expected parsing error, got nil instead")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetManagedByNode(t *testing.T) {
|
||||
|
||||
// Test that a matching node is found
|
||||
|
||||
node1 := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
Labels: map[string]string{"key1": "value1"},
|
||||
},
|
||||
}
|
||||
|
||||
node2 := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node2",
|
||||
Labels: map[string]string{"key2": "value2"},
|
||||
},
|
||||
}
|
||||
|
||||
ctrl := &csiSnapshotCommonController{
|
||||
nodeLister: FakeNodeLister{NodeList: []*v1.Node{node1, node2}},
|
||||
}
|
||||
|
||||
pv := &v1.PersistentVolume{
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
NodeAffinity: &v1.VolumeNodeAffinity{
|
||||
Required: &v1.NodeSelector{
|
||||
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
||||
{
|
||||
MatchExpressions: []v1.NodeSelectorRequirement{
|
||||
{
|
||||
Key: "key1",
|
||||
Operator: v1.NodeSelectorOpIn,
|
||||
Values: []string{"value1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodeName, err := ctrl.getManagedByNode(pv)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error occurred: %v", err)
|
||||
}
|
||||
if nodeName != "node1" {
|
||||
t.Errorf("Expected node:%s , Found node: %s instead", "node1", nodeName)
|
||||
}
|
||||
|
||||
// Test that no matching node is found
|
||||
|
||||
node1 = &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
},
|
||||
}
|
||||
|
||||
ctrl = &csiSnapshotCommonController{
|
||||
nodeLister: FakeNodeLister{NodeList: []*v1.Node{node1}},
|
||||
}
|
||||
|
||||
nodeName, _ = ctrl.getManagedByNode(pv)
|
||||
if nodeName != "" {
|
||||
t.Errorf("Expected no node, Found node(%s)", nodeName)
|
||||
}
|
||||
}
|
||||
|
@@ -107,6 +107,9 @@ const (
|
||||
// VolumeSnapshotInvalidLabel is applied to invalid snapshot as a label key. The value does not matter.
|
||||
// See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/177-volume-snapshot/tighten-validation-webhook-crd.md#automatic-labelling-of-invalid-objects
|
||||
VolumeSnapshotInvalidLabel = "snapshot.storage.kubernetes.io/invalid-snapshot-resource"
|
||||
// VolumeSnapshotContentManagedByLabel is applied by the snapshot controller to the VolumeSnapshotContent object in case distributed snapshotting is enabled.
|
||||
// The value contains the name of the node that handles the snapshot for the volume local to that node.
|
||||
VolumeSnapshotContentManagedByLabel = "snapshot.storage.kubernetes.io/managed-by"
|
||||
)
|
||||
|
||||
var SnapshotterSecretParams = secretParamsMap{
|
||||
|
Reference in New Issue
Block a user