Handle snapshot error, get default storage class, and other small

changes
This commit is contained in:
Jing Xu
2018-08-16 17:42:32 -07:00
parent 3e12fd6a36
commit 870fd8ec8c
3 changed files with 197 additions and 125 deletions

View File

@@ -57,6 +57,7 @@ type csiSnapshotController struct {
snapshotStore cache.Store
contentStore cache.Store
classStore cache.Store
handler Handler
// Map of scheduled/running operations.
@@ -100,6 +101,7 @@ func NewCSISnapshotController(
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
classStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}
@@ -160,10 +162,10 @@ func (ctrl *csiSnapshotController) enqueueSnapshotWork(obj interface{}) {
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if vs, ok := obj.(*crdv1.VolumeSnapshot); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(vs)
if snapshot, ok := obj.(*crdv1.VolumeSnapshot); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(snapshot)
if err != nil {
glog.Errorf("failed to get key from object: %v, %v", err, vs)
glog.Errorf("failed to get key from object: %v, %v", err, snapshot)
return
}
glog.V(5).Infof("enqueued %q for sync", objName)
@@ -171,7 +173,7 @@ func (ctrl *csiSnapshotController) enqueueSnapshotWork(obj interface{}) {
}
}
// enqueueContentWork adds snapshot data to given work queue.
// enqueueContentWork adds snapshot content to given work queue.
func (ctrl *csiSnapshotController) enqueueContentWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
@@ -208,9 +210,9 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
}
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err == nil {
// The volume snapshot still exists in informer cache, the event must have
// been add/update/sync
if ctrl.shouldProcessSnapshot(snapshot) {
// The volume snapshot still exists in informer cache, the event must have
// been add/update/sync
glog.V(4).Infof("should process snapshot")
ctrl.updateSnapshot(snapshot)
}
@@ -229,7 +231,7 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
if !found {
// The controller has already processed the delete event and
// deleted the snapshot from its cache
glog.V(2).Infof("deletion of vs %q was already processed", key)
glog.V(2).Infof("deletion of snapshot %q was already processed", key)
return false
}
snapshot, ok := vsObj.(*crdv1.VolumeSnapshot)
@@ -325,6 +327,8 @@ func (ctrl *csiSnapshotController) contentWorker() {
func (ctrl *csiSnapshotController) shouldProcessSnapshot(snapshot *crdv1.VolumeSnapshot) bool {
class, err := ctrl.GetClassFromVolumeSnapshot(snapshot)
if err != nil {
glog.V(2).Infof("fail to get snapshot class for snapshot %s: %v", snapshotKey(snapshot), err)
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot with error %v", err))
return false
}
glog.V(5).Infof("VolumeSnapshotClass Snapshotter [%s] Snapshot Controller snapshotterName [%s]", class.Snapshotter, ctrl.snapshotterName)
@@ -337,25 +341,25 @@ func (ctrl *csiSnapshotController) shouldProcessSnapshot(snapshot *crdv1.VolumeS
// updateSnapshot runs in worker thread and handles "snapshot added",
// "snapshot updated" and "periodic sync" events.
func (ctrl *csiSnapshotController) updateSnapshot(vs *crdv1.VolumeSnapshot) {
// Store the new vs version in the cache and do not process it if this is
func (ctrl *csiSnapshotController) updateSnapshot(snapshot *crdv1.VolumeSnapshot) {
// Store the new snapshot version in the cache and do not process it if this is
// an old version.
glog.V(5).Infof("updateSnapshot %q", snapshotKey(vs))
newVS, err := ctrl.storeSnapshotUpdate(vs)
glog.V(5).Infof("updateSnapshot %q", snapshotKey(snapshot))
newSnapshot, err := ctrl.storeSnapshotUpdate(snapshot)
if err != nil {
glog.Errorf("%v", err)
}
if !newVS {
if !newSnapshot {
return
}
err = ctrl.syncSnapshot(vs)
err = ctrl.syncSnapshot(snapshot)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
glog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(vs), err)
glog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(snapshot), err)
} else {
glog.Errorf("could not sync volume %q: %+v", snapshotKey(vs), err)
glog.Errorf("could not sync volume %q: %+v", snapshotKey(snapshot), err)
}
}
}
@@ -363,7 +367,7 @@ func (ctrl *csiSnapshotController) updateSnapshot(vs *crdv1.VolumeSnapshot) {
// updateContent runs in worker thread and handles "content added",
// "content updated" and "periodic sync" events.
func (ctrl *csiSnapshotController) updateContent(content *crdv1.VolumeSnapshotContent) {
// Store the new vs version in the cache and do not process it if this is
// Store the new content version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeContentUpdate(content)
if err != nil {
@@ -385,19 +389,19 @@ func (ctrl *csiSnapshotController) updateContent(content *crdv1.VolumeSnapshotCo
}
// deleteSnapshot runs in worker thread and handles "snapshot deleted" event.
func (ctrl *csiSnapshotController) deleteSnapshot(vs *crdv1.VolumeSnapshot) {
_ = ctrl.snapshotStore.Delete(vs)
glog.V(4).Infof("vs %q deleted", snapshotKey(vs))
func (ctrl *csiSnapshotController) deleteSnapshot(snapshot *crdv1.VolumeSnapshot) {
_ = ctrl.snapshotStore.Delete(snapshot)
glog.V(4).Infof("snapshot %q deleted", snapshotKey(snapshot))
snapshotContentName := vs.Spec.SnapshotContentName
snapshotContentName := snapshot.Spec.SnapshotContentName
if snapshotContentName == "" {
glog.V(5).Infof("deleteSnapshot[%q]: content not bound", snapshotKey(vs))
glog.V(5).Infof("deleteSnapshot[%q]: content not bound", snapshotKey(snapshot))
return
}
// sync the content when its vs is deleted. Explicitly sync'ing the
// content here in response to vs deletion prevents the content from
// sync the content when its snapshot is deleted. Explicitly sync'ing the
// content here in response to snapshot deletion prevents the content from
// waiting until the next sync period for its Release.
glog.V(5).Infof("deleteSnapshot[%q]: scheduling sync of content %s", snapshotKey(vs), snapshotContentName)
glog.V(5).Infof("deleteSnapshot[%q]: scheduling sync of content %s", snapshotKey(snapshot), snapshotContentName)
ctrl.contentQueue.Add(snapshotContentName)
}
@@ -422,14 +426,14 @@ func (ctrl *csiSnapshotController) deleteContent(content *crdv1.VolumeSnapshotCo
// order to have the caches already filled when first addSnapshot/addContent to
// perform initial synchronization of the controller.
func (ctrl *csiSnapshotController) initializeCaches(snapshotLister storagelisters.VolumeSnapshotLister, contentLister storagelisters.VolumeSnapshotContentLister) {
vsList, err := snapshotLister.List(labels.Everything())
snapshotList, err := snapshotLister.List(labels.Everything())
if err != nil {
glog.Errorf("CSISnapshotController can't initialize caches: %v", err)
return
}
for _, vs := range vsList {
vsClone := vs.DeepCopy()
if _, err = ctrl.storeSnapshotUpdate(vsClone); err != nil {
for _, snapshot := range snapshotList {
snapshotClone := snapshot.DeepCopy()
if _, err = ctrl.storeSnapshotUpdate(snapshotClone); err != nil {
glog.Errorf("error updating volume snapshot cache: %v", err)
}
}