diff --git a/pkg/common-controller/groupsnapshot_controller_helper.go b/pkg/common-controller/groupsnapshot_controller_helper.go index 335ffcff..fb650ef3 100644 --- a/pkg/common-controller/groupsnapshot_controller_helper.go +++ b/pkg/common-controller/groupsnapshot_controller_helper.go @@ -802,3 +802,113 @@ func (ctrl *csiSnapshotCommonController) getCreateGroupSnapshotInput(groupSnapsh return class, volumes, contentName, nil } + +// syncGroupSnapshotContent deals with one key off the queue +func (ctrl *csiSnapshotCommonController) syncGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) error { + groupSnapshotName := utils.GroupSnapshotRefKey(&content.Spec.VolumeGroupSnapshotRef) + klog.V(4).Infof("synchronizing VolumeGroupSnapshotContent[%s]: content is bound to group snapshot %s", content.Name, groupSnapshotName) + + klog.V(5).Infof("syncGroupSnapshotContent[%s]: check if we should add invalid label on content", content.Name) + + // Keep this check in the controller since the validation webhook may not have been deployed. + if (content.Spec.Source.VolumeGroupSnapshotHandle == nil && len(content.Spec.Source.PersistentVolumeNames) == 0) || + (content.Spec.Source.VolumeGroupSnapshotHandle != nil && len(content.Spec.Source.PersistentVolumeNames) > 0) { + err := fmt.Errorf("Exactly one of VolumeGroupSnapshotHandle and PersistentVolumeNames should be specified") + klog.Errorf("syncGroupSnapshotContent[%s]: validation error, %s", content.Name, err.Error()) + ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "GroupContentValidationError", err.Error()) + return err + } + + // The VolumeGroupSnapshotContent is reserved for a VolumeGroupSnapshot; + // that VolumeGroupSnapshot has not yet been bound to this VolumeGroupSnapshotContent; + // syncGroupSnapshot will handle it. + if content.Spec.VolumeGroupSnapshotRef.UID == "" { + klog.V(4).Infof("syncGroupSnapshotContent [%s]: VolumeGroupSnapshotContent is pre-bound to VolumeGroupSnapshot %s", content.Name, groupSnapshotName) + return nil + } + + /* + TODO: Add finalizer to prevent deletion + */ + + // Check if group snapshot exists in cache store + // If getGroupSnapshotFromStore returns (nil, nil), it means group snapshot not found + // and it may have already been deleted, and it will fall into the + // group snapshot == nil case below + var groupSnapshot *crdv1alpha1.VolumeGroupSnapshot + groupSnapshot, err := ctrl.getGroupSnapshotFromStore(groupSnapshotName) + if err != nil { + return err + } + + if groupSnapshot != nil && groupSnapshot.UID != content.Spec.VolumeGroupSnapshotRef.UID { + // The group snapshot that the content was pointing to was deleted, and another + // with the same name created. + klog.V(4).Infof("syncGroupSnapshotContent [%s]: group snapshot %s has different UID, the old one must have been deleted", content.Name, groupSnapshotName) + // Treat the content as bound to a missing snapshot. + groupSnapshot = nil + } else { + // Check if snapshot.Status is different from content.Status and add snapshot to queue + // if there is a difference and it is worth triggering an snapshot status update. + if groupSnapshot != nil && ctrl.needsUpdateGroupSnapshotStatus(groupSnapshot, content) { + klog.V(4).Infof("synchronizing VolumeSnapshotContent for snapshot [%s]: update snapshot status to true if needed.", groupSnapshotName) + // Manually trigger a snapshot status update to happen + // right away so that it is in-sync with the content status + ctrl.groupSnapshotQueue.Add(groupSnapshotName) + } + } + return nil +} + +// getGroupSnapshotFromStore finds snapshot from the cache store. +// If getGroupSnapshotFromStore returns (nil, nil), it means group snapshot not +// found and it may have already been deleted. +func (ctrl *csiSnapshotCommonController) getGroupSnapshotFromStore(snapshotName string) (*crdv1alpha1.VolumeGroupSnapshot, error) { + // Get the VolumeGroupSnapshot by _name_ + var groupSnapshot *crdv1alpha1.VolumeGroupSnapshot + obj, found, err := ctrl.groupSnapshotStore.GetByKey(snapshotName) + if err != nil { + return nil, err + } + if !found { + klog.V(4).Infof("getGroupSnapshotFromStore: group snapshot %s not found", snapshotName) + // Fall through with group snapshot = nil + return nil, nil + } + var ok bool + groupSnapshot, ok = obj.(*crdv1alpha1.VolumeGroupSnapshot) + if !ok { + return nil, fmt.Errorf("cannot convert object from group snapshot cache to group snapshot %q!?: %#v", snapshotName, obj) + } + klog.V(4).Infof("getGroupSnapshotFromStore: group snapshot %s found", snapshotName) + + return groupSnapshot, nil +} + +// needsUpdateGroupSnapshotStatus compares group snapshot status with the content +// status and decide if group snapshot status needs to be updated based on content +// status +func (ctrl *csiSnapshotCommonController) needsUpdateGroupSnapshotStatus(groupSnapshot *crdv1alpha1.VolumeGroupSnapshot, content *crdv1alpha1.VolumeGroupSnapshotContent) bool { + klog.V(5).Infof("needsUpdateGroupSnapshotStatus[%s]", utils.GroupSnapshotKey(groupSnapshot)) + + if groupSnapshot.Status == nil && content.Status != nil { + return true + } + if content.Status == nil { + return false + } + if groupSnapshot.Status.BoundVolumeGroupSnapshotContentName == nil { + return true + } + if groupSnapshot.Status.CreationTime == nil && content.Status.CreationTime != nil { + return true + } + if groupSnapshot.Status.ReadyToUse == nil && content.Status.ReadyToUse != nil { + return true + } + if groupSnapshot.Status.ReadyToUse != nil && content.Status.ReadyToUse != nil && groupSnapshot.Status.ReadyToUse != content.Status.ReadyToUse { + return true + } + + return false +} diff --git a/pkg/common-controller/snapshot_controller_base.go b/pkg/common-controller/snapshot_controller_base.go index 551ec9fb..42b6a4c3 100644 --- a/pkg/common-controller/snapshot_controller_base.go +++ b/pkg/common-controller/snapshot_controller_base.go @@ -661,7 +661,7 @@ func (ctrl *csiSnapshotCommonController) groupSnapshotContentWorker() { } defer ctrl.groupSnapshotContentQueue.Done(keyObj) - if err := ctrl.syncContentByKey(keyObj.(string)); err != nil { + if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil { // Rather than wait for a full resync, re-add the key to the // queue to be processed. ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj) @@ -762,3 +762,88 @@ func (ctrl *csiSnapshotCommonController) checkAndUpdateGroupSnapshotClass(groupS } return newGroupSnapshot, nil } + +// syncGroupSnapshotContentByKey processes a VolumeGroupSnapshotContent request. +func (ctrl *csiSnapshotCommonController) syncGroupSnapshotContentByKey(key string) error { + klog.V(5).Infof("syncGroupSnapshotContentByKey[%s]", key) + + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + klog.V(4).Infof("error getting name of groupSnapshotContent %q to get groupSnapshotContent from informer: %v", key, err) + return nil + } + content, err := ctrl.groupSnapshotContentLister.Get(name) + // The content still exists in informer cache, the event must have + // been add/update/sync + if err == nil { + // If error occurs we add this item back to the queue + return ctrl.updateGroupSnapshotContent(content) + } + if !errors.IsNotFound(err) { + klog.V(2).Infof("error getting group snapshot content %q from informer: %v", key, err) + return nil + } + + // The group snapshot content is not in informer cache, the event must have been "delete" + contentObj, found, err := ctrl.groupSnapshotContentStore.GetByKey(key) + if err != nil { + klog.V(2).Infof("error getting group snapshot content %q from cache: %v", key, err) + return nil + } + if !found { + // The controller has already processed the delete event and + // deleted the group snapshot content from its cache + klog.V(2).Infof("deletion of group snapshot content %q was already processed", key) + return nil + } + content, ok := contentObj.(*crdv1alpha1.VolumeGroupSnapshotContent) + if !ok { + klog.Errorf("expected group snapshot content, got %+v", content) + return nil + } + ctrl.deleteGroupSnapshotContent(content) + return nil +} + +// updateGroupSnapshotContent runs in worker thread and handles "groupsnapshotcontent added", +// "groupsnapshotcontent updated" and "periodic sync" events. +func (ctrl *csiSnapshotCommonController) updateGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) error { + // Store the new group snapshot content version in the cache and do not process + // it if this is an old version. + new, err := ctrl.storeGroupSnapshotContentUpdate(content) + if err != nil { + klog.Errorf("%v", err) + } + if !new { + return nil + } + err = ctrl.syncGroupSnapshotContent(content) + if err != nil { + if errors.IsConflict(err) { + // Version conflict error happens quite often and the controller + // recovers from it easily. + klog.V(3).Infof("could not sync group snapshot content %q: %+v", content.Name, err) + } else { + klog.Errorf("could not sync group snapshot content %q: %+v", content.Name, err) + } + return err + } + return nil +} + +// deleteGroupSnapshotContent runs in worker thread and handles "groupsnapshotcontent deleted" event. +func (ctrl *csiSnapshotCommonController) deleteGroupSnapshotContent(content *crdv1alpha1.VolumeGroupSnapshotContent) { + _ = ctrl.contentStore.Delete(content) + klog.V(4).Infof("group snapshot content %q deleted", content.Name) + + groupSnapshotName := utils.GroupSnapshotRefKey(&content.Spec.VolumeGroupSnapshotRef) + if groupSnapshotName == "" { + klog.V(5).Infof("deleteGroupContent[%q]: group snapshot content not bound", content.Name) + return + } + // sync the group snapshot when its group snapshot content is deleted. Explicitly + // sync'ing the group snapshot here in response to group snapshot content deletion + // prevents the group snapshot from waiting until the next sync period for its release. + klog.V(5).Infof("deleteGroupContent[%q]: scheduling sync of group snapshot %s", content.Name, groupSnapshotName) + ctrl.groupSnapshotQueue.Add(groupSnapshotName) +}