Add requeue in sidecar

This commit is contained in:
xing-yang
2020-01-02 16:52:03 +00:00
parent aafeee9c56
commit cbd5b8b7a1
4 changed files with 128 additions and 168 deletions

View File

@@ -37,7 +37,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/goroutinemap"
)
type csiSnapshotSideCarController struct {
@@ -55,8 +54,6 @@ type csiSnapshotSideCarController struct {
contentStore cache.Store
handler Handler
// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap
resyncPeriod time.Duration
}
@@ -81,15 +78,14 @@ func NewCSISnapshotSideCarController(
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)})
ctrl := &csiSnapshotSideCarController{
clientset: clientset,
client: client,
driverName: driverName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
runningOperations: goroutinemap.NewGoRoutineMap(true),
resyncPeriod: resyncPeriod,
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
clientset: clientset,
client: client,
driverName: driverName,
eventRecorder: eventRecorder,
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
resyncPeriod: resyncPeriod,
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}
volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -149,62 +145,77 @@ func (ctrl *csiSnapshotSideCarController) enqueueContentWork(obj interface{}) {
// contentWorker processes items from contentQueue. It must run only once,
// syncContent is not assured to be reentrant.
func (ctrl *csiSnapshotSideCarController) contentWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.contentQueue.Get()
if quit {
return true
}
defer ctrl.contentQueue.Done(keyObj)
key := keyObj.(string)
klog.V(5).Infof("contentWorker[%s]", key)
for ctrl.processNextItem() {
}
}
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
return false
}
content, err := ctrl.contentLister.Get(name)
// The content still exists in informer cache, the event must have
// been add/update/sync
if err == nil {
if ctrl.isDriverMatch(content) {
ctrl.updateContentInCacheStore(content)
}
return false
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting content %q from informer: %v", key, err)
return false
}
// The content is not in informer cache, the event must have been
// "delete"
contentObj, found, err := ctrl.contentStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting content %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the content from its cache
klog.V(2).Infof("deletion of content %q was already processed", key)
return false
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
klog.Errorf("expected content, got %+v", content)
return false
}
ctrl.deleteContentInCacheStore(content)
func (ctrl *csiSnapshotSideCarController) processNextItem() bool {
keyObj, quit := ctrl.contentQueue.Get()
if quit {
return false
}
defer ctrl.contentQueue.Done(keyObj)
for {
if quit := workFunc(); quit {
klog.Infof("content worker queue shutting down")
return
}
if err := ctrl.syncContentByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.contentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err)
return true
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
ctrl.contentQueue.Forget(keyObj)
return true
}
func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
klog.V(5).Infof("syncContentByKey[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
return nil
}
content, err := ctrl.contentLister.Get(name)
// The content still exists in informer cache, the event must have
// been add/update/sync
if err == nil {
if ctrl.isDriverMatch(content) {
err = ctrl.updateContentInInformerCache(content)
}
if err != nil {
// If error occurs we add this item back to the queue
return err
}
return nil
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting content %q from informer: %v", key, err)
return nil
}
// The content is not in informer cache, the event must have been
// "delete"
contentObj, found, err := ctrl.contentStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting content %q from cache: %v", key, err)
return nil
}
if !found {
// The controller has already processed the delete event and
// deleted the content from its cache
klog.V(2).Infof("deletion of content %q was already processed", key)
return nil
}
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
if !ok {
klog.Errorf("expected content, got %+v", content)
return nil
}
ctrl.deleteContentInCacheStore(content)
return nil
}
// verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name
@@ -228,9 +239,9 @@ func (ctrl *csiSnapshotSideCarController) isDriverMatch(content *crdv1.VolumeSna
return true
}
// updateContent runs in worker thread and handles "content added",
// updateContentInInformerCache runs in worker thread and handles "content added",
// "content updated" and "periodic sync" events.
func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crdv1.VolumeSnapshotContent) {
func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) error {
// Store the new content version in the cache and do not process it if this is
// an old version.
new, err := ctrl.storeContentUpdate(content)
@@ -238,7 +249,7 @@ func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crd
klog.Errorf("%v", err)
}
if !new {
return
return nil
}
err = ctrl.syncContent(content)
if err != nil {
@@ -249,7 +260,9 @@ func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crd
} else {
klog.Errorf("could not sync content %q: %+v", content.Name, err)
}
return err
}
return nil
}
// deleteContent runs in worker thread and handles "content deleted" event.