Add review comments in cmd and controller
This commit is contained in:
439
pkg/controller/snapshot_controller_base.go
Normal file
439
pkg/controller/snapshot_controller_base.go
Normal file
@@ -0,0 +1,439 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
|
||||
clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
|
||||
storageinformers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions/volumesnapshot/v1alpha1"
|
||||
storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1alpha1"
|
||||
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
||||
)
|
||||
|
||||
type csiSnapshotController struct {
|
||||
clientset clientset.Interface
|
||||
client kubernetes.Interface
|
||||
snapshotterName string
|
||||
eventRecorder record.EventRecorder
|
||||
snapshotQueue workqueue.RateLimitingInterface
|
||||
contentQueue workqueue.RateLimitingInterface
|
||||
|
||||
snapshotLister storagelisters.VolumeSnapshotLister
|
||||
snapshotListerSynced cache.InformerSynced
|
||||
contentLister storagelisters.VolumeSnapshotContentLister
|
||||
contentListerSynced cache.InformerSynced
|
||||
classLister storagelisters.VolumeSnapshotClassLister
|
||||
classListerSynced cache.InformerSynced
|
||||
|
||||
snapshotStore cache.Store
|
||||
contentStore cache.Store
|
||||
|
||||
handler Handler
|
||||
// Map of scheduled/running operations.
|
||||
runningOperations goroutinemap.GoRoutineMap
|
||||
|
||||
createSnapshotContentRetryCount int
|
||||
createSnapshotContentInterval time.Duration
|
||||
resyncPeriod time.Duration
|
||||
}
|
||||
|
||||
// NewCSISnapshotController returns a new *csiSnapshotController
|
||||
func NewCSISnapshotController(
|
||||
clientset clientset.Interface,
|
||||
client kubernetes.Interface,
|
||||
snapshotterName string,
|
||||
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
|
||||
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
|
||||
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
|
||||
createSnapshotContentRetryCount int,
|
||||
createSnapshotContentInterval time.Duration,
|
||||
conn connection.CSIConnection,
|
||||
timeout time.Duration,
|
||||
resyncPeriod time.Duration,
|
||||
snapshotNamePrefix string,
|
||||
snapshotNameUUIDLength int,
|
||||
) *csiSnapshotController {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.Core().Events(v1.NamespaceAll)})
|
||||
var eventRecorder record.EventRecorder
|
||||
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", snapshotterName)})
|
||||
|
||||
ctrl := &csiSnapshotController{
|
||||
clientset: clientset,
|
||||
client: client,
|
||||
snapshotterName: snapshotterName,
|
||||
eventRecorder: eventRecorder,
|
||||
handler: NewCSIHandler(conn, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
|
||||
runningOperations: goroutinemap.NewGoRoutineMap(true),
|
||||
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
|
||||
createSnapshotContentInterval: createSnapshotContentInterval,
|
||||
resyncPeriod: resyncPeriod,
|
||||
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
|
||||
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
|
||||
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
|
||||
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
|
||||
}
|
||||
|
||||
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
|
||||
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueSnapshotWork(newObj) },
|
||||
DeleteFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
|
||||
},
|
||||
ctrl.resyncPeriod,
|
||||
)
|
||||
ctrl.snapshotLister = volumeSnapshotInformer.Lister()
|
||||
ctrl.snapshotListerSynced = volumeSnapshotInformer.Informer().HasSynced
|
||||
|
||||
volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { ctrl.enqueueContentWork(obj) },
|
||||
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueContentWork(newObj) },
|
||||
DeleteFunc: func(obj interface{}) { ctrl.enqueueContentWork(obj) },
|
||||
},
|
||||
ctrl.resyncPeriod,
|
||||
)
|
||||
ctrl.contentLister = volumeSnapshotContentInformer.Lister()
|
||||
ctrl.contentListerSynced = volumeSnapshotContentInformer.Informer().HasSynced
|
||||
|
||||
ctrl.classLister = volumeSnapshotClassInformer.Lister()
|
||||
ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced
|
||||
|
||||
return ctrl
|
||||
}
|
||||
|
||||
func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer ctrl.snapshotQueue.ShutDown()
|
||||
defer ctrl.contentQueue.ShutDown()
|
||||
|
||||
glog.Infof("Starting CSI snapshotter")
|
||||
defer glog.Infof("Shutting CSI snapshotter")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced) {
|
||||
glog.Errorf("Cannot sync caches")
|
||||
return
|
||||
}
|
||||
|
||||
ctrl.initializeCaches(ctrl.snapshotLister, ctrl.contentLister)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(ctrl.snapshotWorker, 0, stopCh)
|
||||
go wait.Until(ctrl.contentWorker, 0, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
// enqueueSnapshotWork adds snapshot to given work queue.
|
||||
func (ctrl *csiSnapshotController) enqueueSnapshotWork(obj interface{}) {
|
||||
// Beware of "xxx deleted" events
|
||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
||||
obj = unknown.Obj
|
||||
}
|
||||
if vs, ok := obj.(*crdv1.VolumeSnapshot); ok {
|
||||
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(vs)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to get key from object: %v, %v", err, vs)
|
||||
return
|
||||
}
|
||||
glog.V(5).Infof("enqueued %q for sync", objName)
|
||||
ctrl.snapshotQueue.Add(objName)
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueContentWork adds snapshot data to given work queue.
|
||||
func (ctrl *csiSnapshotController) enqueueContentWork(obj interface{}) {
|
||||
// Beware of "xxx deleted" events
|
||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
||||
obj = unknown.Obj
|
||||
}
|
||||
if content, ok := obj.(*crdv1.VolumeSnapshotContent); ok {
|
||||
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(content)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to get key from object: %v, %v", err, content)
|
||||
return
|
||||
}
|
||||
glog.V(5).Infof("enqueued %q for sync", objName)
|
||||
ctrl.contentQueue.Add(objName)
|
||||
}
|
||||
}
|
||||
|
||||
// snapshotWorker processes items from snapshotQueue. It must run only once,
|
||||
// syncSnapshot is not assured to be reentrant.
|
||||
func (ctrl *csiSnapshotController) snapshotWorker() {
|
||||
workFunc := func() bool {
|
||||
keyObj, quit := ctrl.snapshotQueue.Get()
|
||||
if quit {
|
||||
return true
|
||||
}
|
||||
defer ctrl.snapshotQueue.Done(keyObj)
|
||||
key := keyObj.(string)
|
||||
glog.V(5).Infof("snapshotWorker[%s]", key)
|
||||
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
glog.V(5).Infof("snapshotWorker: snapshot namespace [%s] name [%s]", namespace, name)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err)
|
||||
return false
|
||||
}
|
||||
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
|
||||
if err == nil {
|
||||
if ctrl.shouldProcessSnapshot(snapshot) {
|
||||
// The volume snapshot still exists in informer cache, the event must have
|
||||
// been add/update/sync
|
||||
glog.V(4).Infof("should process snapshot")
|
||||
ctrl.updateSnapshot(snapshot)
|
||||
}
|
||||
return false
|
||||
}
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
glog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
|
||||
return false
|
||||
}
|
||||
// The snapshot is not in informer cache, the event must have been "delete"
|
||||
vsObj, found, err := ctrl.snapshotStore.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("error getting snapshot %q from cache: %v", key, err)
|
||||
return false
|
||||
}
|
||||
if !found {
|
||||
// The controller has already processed the delete event and
|
||||
// deleted the snapshot from its cache
|
||||
glog.V(2).Infof("deletion of vs %q was already processed", key)
|
||||
return false
|
||||
}
|
||||
snapshot, ok := vsObj.(*crdv1.VolumeSnapshot)
|
||||
if !ok {
|
||||
glog.Errorf("expected vs, got %+v", vsObj)
|
||||
return false
|
||||
}
|
||||
if ctrl.shouldProcessSnapshot(snapshot) {
|
||||
ctrl.deleteSnapshot(snapshot)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
for {
|
||||
if quit := workFunc(); quit {
|
||||
glog.Infof("snapshot worker queue shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// contentWorker processes items from contentQueue. It must run only once,
|
||||
// syncContent is not assured to be reentrant.
|
||||
func (ctrl *csiSnapshotController) contentWorker() {
|
||||
workFunc := func() bool {
|
||||
keyObj, quit := ctrl.contentQueue.Get()
|
||||
if quit {
|
||||
return true
|
||||
}
|
||||
defer ctrl.contentQueue.Done(keyObj)
|
||||
key := keyObj.(string)
|
||||
glog.V(5).Infof("contentWorker[%s]", key)
|
||||
|
||||
_, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("error getting name of snapshotData %q to get snapshotData from informer: %v", key, err)
|
||||
return false
|
||||
}
|
||||
content, err := ctrl.contentLister.Get(name)
|
||||
if err == nil {
|
||||
// The volume still exists in informer cache, the event must have
|
||||
// been add/update/sync
|
||||
ctrl.updateContent(content)
|
||||
return false
|
||||
}
|
||||
if !errors.IsNotFound(err) {
|
||||
glog.V(2).Infof("error getting content %q from informer: %v", key, err)
|
||||
return false
|
||||
}
|
||||
|
||||
// The content is not in informer cache, the event must have been
|
||||
// "delete"
|
||||
contentObj, found, err := ctrl.contentStore.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("error getting content %q from cache: %v", key, err)
|
||||
return false
|
||||
}
|
||||
if !found {
|
||||
// The controller has already processed the delete event and
|
||||
// deleted the volume from its cache
|
||||
glog.V(2).Infof("deletion of content %q was already processed", key)
|
||||
return false
|
||||
}
|
||||
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
|
||||
if !ok {
|
||||
glog.Errorf("expected content, got %+v", content)
|
||||
return false
|
||||
}
|
||||
ctrl.deleteContent(content)
|
||||
return false
|
||||
}
|
||||
|
||||
for {
|
||||
if quit := workFunc(); quit {
|
||||
glog.Infof("content worker queue shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shouldProcessSnapshot detect if snapshotter in the VolumeSnapshotClass is the same as the snapshotter
|
||||
// in external controller.
|
||||
func (ctrl *csiSnapshotController) shouldProcessSnapshot(snapshot *crdv1.VolumeSnapshot) bool {
|
||||
class, err := ctrl.GetClassFromVolumeSnapshot(snapshot)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
glog.V(5).Infof("VolumeSnapshotClass Snapshotter [%s] Snapshot Controller snapshotterName [%s]", class.Snapshotter, ctrl.snapshotterName)
|
||||
if class.Snapshotter != ctrl.snapshotterName {
|
||||
glog.V(4).Infof("Skipping VolumeSnapshot %s for snapshotter [%s] in VolumeSnapshotClass because it does not match with the snapshotter for controller [%s]", snapshotKey(snapshot), class.Snapshotter, ctrl.snapshotterName)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// updateSnapshot runs in worker thread and handles "snapshot added",
|
||||
// "snapshot updated" and "periodic sync" events.
|
||||
func (ctrl *csiSnapshotController) updateSnapshot(vs *crdv1.VolumeSnapshot) {
|
||||
// Store the new vs version in the cache and do not process it if this is
|
||||
// an old version.
|
||||
glog.V(5).Infof("updateSnapshot %q", snapshotKey(vs))
|
||||
newVS, err := ctrl.storeSnapshotUpdate(vs)
|
||||
if err != nil {
|
||||
glog.Errorf("%v", err)
|
||||
}
|
||||
if !newVS {
|
||||
return
|
||||
}
|
||||
err = ctrl.syncSnapshot(vs)
|
||||
if err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
// Version conflict error happens quite often and the controller
|
||||
// recovers from it easily.
|
||||
glog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(vs), err)
|
||||
} else {
|
||||
glog.Errorf("could not sync volume %q: %+v", snapshotKey(vs), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateContent runs in worker thread and handles "content added",
|
||||
// "content updated" and "periodic sync" events.
|
||||
func (ctrl *csiSnapshotController) updateContent(content *crdv1.VolumeSnapshotContent) {
|
||||
// Store the new vs version in the cache and do not process it if this is
|
||||
// an old version.
|
||||
new, err := ctrl.storeContentUpdate(content)
|
||||
if err != nil {
|
||||
glog.Errorf("%v", err)
|
||||
}
|
||||
if !new {
|
||||
return
|
||||
}
|
||||
err = ctrl.syncContent(content)
|
||||
if err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
// Version conflict error happens quite often and the controller
|
||||
// recovers from it easily.
|
||||
glog.V(3).Infof("could not sync content %q: %+v", content.Name, err)
|
||||
} else {
|
||||
glog.Errorf("could not sync content %q: %+v", content.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deleteSnapshot runs in worker thread and handles "snapshot deleted" event.
|
||||
func (ctrl *csiSnapshotController) deleteSnapshot(vs *crdv1.VolumeSnapshot) {
|
||||
_ = ctrl.snapshotStore.Delete(vs)
|
||||
glog.V(4).Infof("vs %q deleted", snapshotKey(vs))
|
||||
|
||||
snapshotContentName := vs.Spec.SnapshotContentName
|
||||
if snapshotContentName == "" {
|
||||
glog.V(5).Infof("deleteSnapshot[%q]: content not bound", snapshotKey(vs))
|
||||
return
|
||||
}
|
||||
// sync the content when its vs is deleted. Explicitly sync'ing the
|
||||
// content here in response to vs deletion prevents the content from
|
||||
// waiting until the next sync period for its Release.
|
||||
glog.V(5).Infof("deleteSnapshot[%q]: scheduling sync of content %s", snapshotKey(vs), snapshotContentName)
|
||||
ctrl.contentQueue.Add(snapshotContentName)
|
||||
}
|
||||
|
||||
// deleteContent runs in worker thread and handles "snapshot deleted" event.
|
||||
func (ctrl *csiSnapshotController) deleteContent(content *crdv1.VolumeSnapshotContent) {
|
||||
_ = ctrl.contentStore.Delete(content)
|
||||
glog.V(4).Infof("content %q deleted", content.Name)
|
||||
|
||||
snapshotName := snapshotRefKey(content.Spec.VolumeSnapshotRef)
|
||||
if snapshotName == "" {
|
||||
glog.V(5).Infof("deleteContent[%q]: content not bound", content.Name)
|
||||
return
|
||||
}
|
||||
// sync the vs when its vs is deleted. Explicitly sync'ing the
|
||||
// vs here in response to content deletion prevents the vs from
|
||||
// waiting until the next sync period for its Release.
|
||||
glog.V(5).Infof("deleteContent[%q]: scheduling sync of vs %s", content.Name, snapshotName)
|
||||
ctrl.contentQueue.Add(snapshotName)
|
||||
}
|
||||
|
||||
// initializeCaches fills all controller caches with initial data from etcd in
|
||||
// order to have the caches already filled when first addSnapshot/addContent to
|
||||
// perform initial synchronization of the controller.
|
||||
func (ctrl *csiSnapshotController) initializeCaches(snapshotLister storagelisters.VolumeSnapshotLister, contentLister storagelisters.VolumeSnapshotContentLister) {
|
||||
vsList, err := snapshotLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("CSISnapshotController can't initialize caches: %v", err)
|
||||
return
|
||||
}
|
||||
for _, vs := range vsList {
|
||||
vsClone := vs.DeepCopy()
|
||||
if _, err = ctrl.storeSnapshotUpdate(vsClone); err != nil {
|
||||
glog.Errorf("error updating volume snapshot cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
contentList, err := contentLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("CSISnapshotController can't initialize caches: %v", err)
|
||||
return
|
||||
}
|
||||
for _, content := range contentList {
|
||||
contentClone := content.DeepCopy()
|
||||
if _, err = ctrl.storeSnapshotUpdate(contentClone); err != nil {
|
||||
glog.Errorf("error updating volume snapshot cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(4).Infof("controller initialized")
|
||||
}
|
Reference in New Issue
Block a user