ItemBlock model and phase 1 (single-thread) workflow changes
Signed-off-by: Scott Seago <sseago@redhat.com>pull/8102/head
parent
8ae667ef5e
commit
9d6f4d2db5
changelogs/unreleased
|
@ -0,0 +1 @@
|
|||
ItemBlock model and phase 1 (single-thread) workflow changes
|
|
@ -109,19 +109,27 @@ This mainly applies to plugins that operate on pods which reference resources wh
|
|||
|
||||
### Changes to processing item list from the Item Collector
|
||||
|
||||
#### New structs ItemBlock and ItemBlockItem
|
||||
#### New structs BackupItemBlock, ItemBlock, and ItemBlockItem
|
||||
```go
|
||||
type ItemBlock struct {
|
||||
log logrus.FieldLogger
|
||||
package backup
|
||||
|
||||
type BackupItemBlock struct {
|
||||
itemblock.ItemBlock
|
||||
// This is a reference to the shared itemBackupper for the backup
|
||||
itemBackupper *itemBackupper
|
||||
}
|
||||
|
||||
package itemblock
|
||||
|
||||
type ItemBlock struct {
|
||||
Log logrus.FieldLogger
|
||||
Items []ItemBlockItem
|
||||
}
|
||||
|
||||
type ItemBlockItem struct {
|
||||
gr schema.GroupResource
|
||||
item *unstructured.Unstructured
|
||||
preferredGVR schema.GroupVersionResource
|
||||
Gr schema.GroupResource
|
||||
Item *unstructured.Unstructured
|
||||
PreferredGVR schema.GroupVersionResource
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ type hookKey struct {
|
|||
// For hooks specified in pod annotation, this field is the pod where hooks are annotated.
|
||||
podName string
|
||||
// HookPhase is only for backup hooks, for restore hooks, this field is empty.
|
||||
hookPhase hookPhase
|
||||
hookPhase HookPhase
|
||||
// HookName is only for hooks specified in the backup/restore spec.
|
||||
// For hooks specified in pod annotation, this field is empty or "<from-annotation>".
|
||||
hookName string
|
||||
|
@ -83,7 +83,7 @@ func NewHookTracker() *HookTracker {
|
|||
// Add adds a hook to the hook tracker
|
||||
// Add must precede the Record for each individual hook.
|
||||
// In other words, a hook must be added to the tracker before its execution result is recorded.
|
||||
func (ht *HookTracker) Add(podNamespace, podName, container, source, hookName string, hookPhase hookPhase) {
|
||||
func (ht *HookTracker) Add(podNamespace, podName, container, source, hookName string, hookPhase HookPhase) {
|
||||
ht.lock.Lock()
|
||||
defer ht.lock.Unlock()
|
||||
|
||||
|
@ -108,7 +108,7 @@ func (ht *HookTracker) Add(podNamespace, podName, container, source, hookName st
|
|||
// Record records the hook's execution status
|
||||
// Add must precede the Record for each individual hook.
|
||||
// In other words, a hook must be added to the tracker before its execution result is recorded.
|
||||
func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool, hookErr error) error {
|
||||
func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName string, hookPhase HookPhase, hookFailed bool, hookErr error) error {
|
||||
ht.lock.Lock()
|
||||
defer ht.lock.Unlock()
|
||||
|
||||
|
@ -179,7 +179,7 @@ func NewMultiHookTracker() *MultiHookTracker {
|
|||
}
|
||||
|
||||
// Add adds a backup/restore hook to the tracker
|
||||
func (mht *MultiHookTracker) Add(name, podNamespace, podName, container, source, hookName string, hookPhase hookPhase) {
|
||||
func (mht *MultiHookTracker) Add(name, podNamespace, podName, container, source, hookName string, hookPhase HookPhase) {
|
||||
mht.lock.Lock()
|
||||
defer mht.lock.Unlock()
|
||||
|
||||
|
@ -190,7 +190,7 @@ func (mht *MultiHookTracker) Add(name, podNamespace, podName, container, source,
|
|||
}
|
||||
|
||||
// Record records a backup/restore hook execution status
|
||||
func (mht *MultiHookTracker) Record(name, podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool, hookErr error) error {
|
||||
func (mht *MultiHookTracker) Record(name, podNamespace, podName, container, source, hookName string, hookPhase HookPhase, hookFailed bool, hookErr error) error {
|
||||
mht.lock.RLock()
|
||||
defer mht.lock.RUnlock()
|
||||
|
||||
|
|
|
@ -43,11 +43,11 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
type hookPhase string
|
||||
type HookPhase string
|
||||
|
||||
const (
|
||||
PhasePre hookPhase = "pre"
|
||||
PhasePost hookPhase = "post"
|
||||
PhasePre HookPhase = "pre"
|
||||
PhasePost HookPhase = "post"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -81,7 +81,7 @@ type ItemHookHandler interface {
|
|||
groupResource schema.GroupResource,
|
||||
obj runtime.Unstructured,
|
||||
resourceHooks []ResourceHook,
|
||||
phase hookPhase,
|
||||
phase HookPhase,
|
||||
hookTracker *HookTracker,
|
||||
) error
|
||||
}
|
||||
|
@ -200,7 +200,7 @@ func (h *DefaultItemHookHandler) HandleHooks(
|
|||
groupResource schema.GroupResource,
|
||||
obj runtime.Unstructured,
|
||||
resourceHooks []ResourceHook,
|
||||
phase hookPhase,
|
||||
phase HookPhase,
|
||||
hookTracker *HookTracker,
|
||||
) error {
|
||||
// We only support hooks on pods right now
|
||||
|
@ -312,27 +312,27 @@ func (h *NoOpItemHookHandler) HandleHooks(
|
|||
groupResource schema.GroupResource,
|
||||
obj runtime.Unstructured,
|
||||
resourceHooks []ResourceHook,
|
||||
phase hookPhase,
|
||||
phase HookPhase,
|
||||
hookTracker *HookTracker,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func phasedKey(phase hookPhase, key string) string {
|
||||
func phasedKey(phase HookPhase, key string) string {
|
||||
if phase != "" {
|
||||
return fmt.Sprintf("%v.%v", phase, key)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func getHookAnnotation(annotations map[string]string, key string, phase hookPhase) string {
|
||||
func getHookAnnotation(annotations map[string]string, key string, phase HookPhase) string {
|
||||
return annotations[phasedKey(phase, key)]
|
||||
}
|
||||
|
||||
// getPodExecHookFromAnnotations returns an ExecHook based on the annotations, as long as the
|
||||
// 'command' annotation is present. If it is absent, this returns nil.
|
||||
// If there is an error in parsing a supplied timeout, it is logged.
|
||||
func getPodExecHookFromAnnotations(annotations map[string]string, phase hookPhase, log logrus.FieldLogger) *velerov1api.ExecHook {
|
||||
func getPodExecHookFromAnnotations(annotations map[string]string, phase HookPhase, log logrus.FieldLogger) *velerov1api.ExecHook {
|
||||
commandValue := getHookAnnotation(annotations, podBackupHookCommandAnnotationKey, phase)
|
||||
if commandValue == "" {
|
||||
return nil
|
||||
|
@ -561,7 +561,7 @@ func GroupRestoreExecHooks(
|
|||
if hookFromAnnotation.Container == "" {
|
||||
hookFromAnnotation.Container = pod.Spec.Containers[0].Name
|
||||
}
|
||||
hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), hookFromAnnotation.Container, HookSourceAnnotation, "<from-annotation>", hookPhase(""))
|
||||
hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), hookFromAnnotation.Container, HookSourceAnnotation, "<from-annotation>", HookPhase(""))
|
||||
byContainer[hookFromAnnotation.Container] = []PodExecRestoreHook{
|
||||
{
|
||||
HookName: "<from-annotation>",
|
||||
|
@ -596,7 +596,7 @@ func GroupRestoreExecHooks(
|
|||
if named.Hook.Container == "" {
|
||||
named.Hook.Container = pod.Spec.Containers[0].Name
|
||||
}
|
||||
hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), named.Hook.Container, HookSourceSpec, rrh.Name, hookPhase(""))
|
||||
hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), named.Hook.Container, HookSourceSpec, rrh.Name, HookPhase(""))
|
||||
byContainer[named.Hook.Container] = append(byContainer[named.Hook.Container], named)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ func TestHandleHooksSkips(t *testing.T) {
|
|||
func TestHandleHooks(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
phase hookPhase
|
||||
phase HookPhase
|
||||
groupResource string
|
||||
item runtime.Unstructured
|
||||
hooks []ResourceHook
|
||||
|
@ -500,7 +500,7 @@ func TestHandleHooks(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetPodExecHookFromAnnotations(t *testing.T) {
|
||||
phases := []hookPhase{"", PhasePre, PhasePost}
|
||||
phases := []HookPhase{"", PhasePre, PhasePost}
|
||||
for _, phase := range phases {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -1999,7 +1999,7 @@ func TestBackupHookTracker(t *testing.T) {
|
|||
}
|
||||
test1 := []struct {
|
||||
name string
|
||||
phase hookPhase
|
||||
phase HookPhase
|
||||
groupResource string
|
||||
pods []podWithHook
|
||||
hookTracker *HookTracker
|
||||
|
|
|
@ -169,7 +169,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks(
|
|||
hookLog.Error(err)
|
||||
errors = append(errors, err)
|
||||
|
||||
errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true, err)
|
||||
errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, HookPhase(""), true, err)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
|
@ -195,7 +195,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks(
|
|||
hookFailed = true
|
||||
}
|
||||
|
||||
errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), hookFailed, hookErr)
|
||||
errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, HookPhase(""), hookFailed, hookErr)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks(
|
|||
},
|
||||
)
|
||||
|
||||
errTracker := multiHookTracker.Record(restoreName, pod.Namespace, pod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true, err)
|
||||
errTracker := multiHookTracker.Record(restoreName, pod.Namespace, pod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, HookPhase(""), true, err)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
|
|
|
@ -1012,17 +1012,17 @@ func TestRestoreHookTrackerUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
hookTracker1 := NewMultiHookTracker()
|
||||
hookTracker1.Add("restore1", "default", "my-pod", "container1", HookSourceAnnotation, "<from-annotation>", hookPhase(""))
|
||||
hookTracker1.Add("restore1", "default", "my-pod", "container1", HookSourceAnnotation, "<from-annotation>", HookPhase(""))
|
||||
|
||||
hookTracker2 := NewMultiHookTracker()
|
||||
hookTracker2.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
hookTracker2.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", HookPhase(""))
|
||||
|
||||
hookTracker3 := NewMultiHookTracker()
|
||||
hookTracker3.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
hookTracker3.Add("restore1", "default", "my-pod", "container2", HookSourceSpec, "my-hook-2", hookPhase(""))
|
||||
hookTracker3.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", HookPhase(""))
|
||||
hookTracker3.Add("restore1", "default", "my-pod", "container2", HookSourceSpec, "my-hook-2", HookPhase(""))
|
||||
|
||||
hookTracker4 := NewMultiHookTracker()
|
||||
hookTracker4.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
hookTracker4.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", HookPhase(""))
|
||||
|
||||
tests1 := []struct {
|
||||
name string
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
@ -46,6 +47,7 @@ import (
|
|||
velerov2alpha1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
"github.com/vmware-tanzu/velero/pkg/discovery"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemblock"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/kuberesource"
|
||||
"github.com/vmware-tanzu/velero/pkg/persistence"
|
||||
|
@ -53,6 +55,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
|
||||
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
|
||||
ibav1 "github.com/vmware-tanzu/velero/pkg/plugin/velero/itemblockaction/v1"
|
||||
vsv1 "github.com/vmware-tanzu/velero/pkg/plugin/velero/volumesnapshotter/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/podexec"
|
||||
"github.com/vmware-tanzu/velero/pkg/podvolume"
|
||||
|
@ -77,6 +80,7 @@ type Backupper interface {
|
|||
backup *Request,
|
||||
backupFile io.Writer,
|
||||
actions []biav2.BackupItemAction,
|
||||
itemBlockActions []ibav1.ItemBlockAction,
|
||||
volumeSnapshotterGetter VolumeSnapshotterGetter,
|
||||
) error
|
||||
|
||||
|
@ -85,6 +89,7 @@ type Backupper interface {
|
|||
backupRequest *Request,
|
||||
backupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
itemBlockActionResolver framework.ItemBlockActionResolver,
|
||||
volumeSnapshotterGetter VolumeSnapshotterGetter,
|
||||
) error
|
||||
|
||||
|
@ -210,9 +215,10 @@ type VolumeSnapshotterGetter interface {
|
|||
// back up individual resources that don't prevent the backup from continuing to be processed) are logged
|
||||
// to the backup log.
|
||||
func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Request, backupFile io.Writer,
|
||||
actions []biav2.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error {
|
||||
actions []biav2.BackupItemAction, itemBlockActions []ibav1.ItemBlockAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error {
|
||||
backupItemActions := framework.NewBackupItemActionResolverV2(actions)
|
||||
return kb.BackupWithResolvers(log, backupRequest, backupFile, backupItemActions, volumeSnapshotterGetter)
|
||||
itemBlockActionResolver := framework.NewItemBlockActionResolver(itemBlockActions)
|
||||
return kb.BackupWithResolvers(log, backupRequest, backupFile, backupItemActions, itemBlockActionResolver, volumeSnapshotterGetter)
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||
|
@ -220,6 +226,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
|||
backupRequest *Request,
|
||||
backupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
itemBlockActionResolver framework.ItemBlockActionResolver,
|
||||
volumeSnapshotterGetter VolumeSnapshotterGetter,
|
||||
) error {
|
||||
gzippedData := gzip.NewWriter(backupFile)
|
||||
|
@ -268,6 +275,12 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
|||
return err
|
||||
}
|
||||
|
||||
backupRequest.ResolvedItemBlockActions, err = itemBlockActionResolver.ResolveActions(kb.discoveryHelper, log)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Errorf("Error from itemBlockActionResolver.ResolveActions")
|
||||
return err
|
||||
}
|
||||
|
||||
backupRequest.BackedUpItems = map[itemKey]struct{}{}
|
||||
|
||||
podVolumeTimeout := kb.podVolumeTimeout
|
||||
|
@ -402,37 +415,71 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
|||
}()
|
||||
|
||||
backedUpGroupResources := map[schema.GroupResource]bool{}
|
||||
// Maps items in the item list from GR+NamespacedName to a slice of pointers to kubernetesResources
|
||||
// We need the slice value since if the EnableAPIGroupVersions feature flag is set, there may
|
||||
// be more than one resource to back up for the given item.
|
||||
itemsMap := make(map[velero.ResourceIdentifier][]*kubernetesResource)
|
||||
for i := range items {
|
||||
key := velero.ResourceIdentifier{
|
||||
GroupResource: items[i].groupResource,
|
||||
Namespace: items[i].namespace,
|
||||
Name: items[i].name,
|
||||
}
|
||||
itemsMap[key] = append(itemsMap[key], items[i])
|
||||
}
|
||||
|
||||
for i, item := range items {
|
||||
var itemBlock *BackupItemBlock
|
||||
|
||||
for i := range items {
|
||||
log.WithFields(map[string]interface{}{
|
||||
"progress": "",
|
||||
"resource": item.groupResource.String(),
|
||||
"namespace": item.namespace,
|
||||
"name": item.name,
|
||||
"resource": items[i].groupResource.String(),
|
||||
"namespace": items[i].namespace,
|
||||
"name": items[i].name,
|
||||
}).Infof("Processing item")
|
||||
|
||||
// use an anonymous func so we can defer-close/remove the file
|
||||
// as soon as we're done with it
|
||||
func() {
|
||||
var unstructured unstructured.Unstructured
|
||||
|
||||
f, err := os.Open(item.path)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error opening file containing item")
|
||||
return
|
||||
// Skip if this item has already been added to an ItemBlock
|
||||
if items[i].inItemBlock {
|
||||
log.Debugf("Not creating new ItemBlock for %s %s/%s because it's already in an ItemBlock", items[i].groupResource.String(), items[i].namespace, items[i].name)
|
||||
} else {
|
||||
if itemBlock == nil {
|
||||
itemBlock = NewBackupItemBlock(log, itemBackupper)
|
||||
}
|
||||
defer f.Close()
|
||||
defer os.Remove(f.Name())
|
||||
var newBlockItem *unstructured.Unstructured
|
||||
|
||||
if err := json.NewDecoder(f).Decode(&unstructured); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error decoding JSON from file")
|
||||
return
|
||||
// If the EnableAPIGroupVersions feature flag is set, there could be multiple versions
|
||||
// of this item to be backed up. Include all of them in the same ItemBlock
|
||||
key := velero.ResourceIdentifier{
|
||||
GroupResource: items[i].groupResource,
|
||||
Namespace: items[i].namespace,
|
||||
Name: items[i].name,
|
||||
}
|
||||
allVersionsOfItem := itemsMap[key]
|
||||
for _, itemVersion := range allVersionsOfItem {
|
||||
unstructured := itemBlock.addKubernetesResource(itemVersion, log)
|
||||
if newBlockItem == nil {
|
||||
newBlockItem = unstructured
|
||||
}
|
||||
}
|
||||
// call GetRelatedItems, add found items to block if not in block, recursively until no more items
|
||||
if newBlockItem != nil {
|
||||
kb.executeItemBlockActions(log, newBlockItem, items[i].groupResource, items[i].name, items[i].namespace, itemsMap, itemBlock)
|
||||
}
|
||||
}
|
||||
|
||||
if backedUp := kb.backupItem(log, item.groupResource, itemBackupper, &unstructured, item.preferredGVR); backedUp {
|
||||
backedUpGroupResources[item.groupResource] = true
|
||||
// We skip calling backupItemBlock here so that we will add the next item to the current ItemBlock if:
|
||||
// 1) This is not the last item to be processed
|
||||
// 2) Both current and next item are ordered resources
|
||||
// 3) Both current and next item are for the same GroupResource
|
||||
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
|
||||
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
|
||||
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
|
||||
backedUpGRs := kb.backupItemBlock(*itemBlock)
|
||||
for _, backedUpGR := range backedUpGRs {
|
||||
backedUpGroupResources[backedUpGR] = true
|
||||
}
|
||||
}()
|
||||
itemBlock = nil
|
||||
}
|
||||
|
||||
// updated total is computed as "how many items we've backed up so far, plus
|
||||
// how many items we know of that are remaining"
|
||||
|
@ -446,9 +493,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
|||
|
||||
log.WithFields(map[string]interface{}{
|
||||
"progress": "",
|
||||
"resource": item.groupResource.String(),
|
||||
"namespace": item.namespace,
|
||||
"name": item.name,
|
||||
"resource": items[i].groupResource.String(),
|
||||
"namespace": items[i].namespace,
|
||||
"name": items[i].name,
|
||||
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
|
||||
}
|
||||
|
||||
|
@ -501,8 +548,174 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
|||
return nil
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource) bool {
|
||||
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false)
|
||||
func (kb *kubernetesBackupper) executeItemBlockActions(
|
||||
log logrus.FieldLogger,
|
||||
obj runtime.Unstructured,
|
||||
groupResource schema.GroupResource,
|
||||
name, namespace string,
|
||||
itemsMap map[velero.ResourceIdentifier][]*kubernetesResource,
|
||||
itemBlock *BackupItemBlock,
|
||||
) {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Warn("Failed to get object metadata.")
|
||||
return
|
||||
}
|
||||
for _, action := range itemBlock.itemBackupper.backupRequest.ResolvedItemBlockActions {
|
||||
if !action.ShouldUse(groupResource, namespace, metadata, log) {
|
||||
continue
|
||||
}
|
||||
log.Info("Executing ItemBlock action")
|
||||
|
||||
relatedItems, err := action.GetRelatedItems(obj, itemBlock.itemBackupper.backupRequest.Backup)
|
||||
if err != nil {
|
||||
log.Error(errors.Wrapf(err, "error executing ItemBlock action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name))
|
||||
continue
|
||||
}
|
||||
|
||||
for _, relatedItem := range relatedItems {
|
||||
var newBlockItem *unstructured.Unstructured
|
||||
// Look for item in itemsMap
|
||||
itemsToAdd := itemsMap[relatedItem]
|
||||
// if item is in the item collector list, we'll have at least one element.
|
||||
// If EnableAPIGroupVersions is set, we may have more than one.
|
||||
// If we get an unstructured obj back from addKubernetesResource, then it wasn't
|
||||
// already in a block and we recursively look for related items in the returned item.
|
||||
if len(itemsToAdd) > 0 {
|
||||
for _, itemToAdd := range itemsToAdd {
|
||||
unstructured := itemBlock.addKubernetesResource(itemToAdd, log)
|
||||
if newBlockItem == nil {
|
||||
newBlockItem = unstructured
|
||||
}
|
||||
}
|
||||
if newBlockItem != nil {
|
||||
kb.executeItemBlockActions(log, newBlockItem, relatedItem.GroupResource, relatedItem.Name, relatedItem.Namespace, itemsMap, itemBlock)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Item wasn't found in item collector list, get from cluster
|
||||
gvr, resource, err := itemBlock.itemBackupper.discoveryHelper.ResourceFor(relatedItem.GroupResource.WithVersion(""))
|
||||
if err != nil {
|
||||
log.Error(errors.Wrapf(err, "Unable to obtain gvr and resource for related item %s %s/%s", relatedItem.GroupResource.String(), relatedItem.Namespace, relatedItem.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
client, err := itemBlock.itemBackupper.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, relatedItem.Namespace)
|
||||
if err != nil {
|
||||
log.Error(errors.Wrapf(err, "Unable to obtain client for gvr %s %s (%s)", gvr.GroupVersion(), resource.Name, relatedItem.Namespace))
|
||||
continue
|
||||
}
|
||||
|
||||
item, err := client.Get(relatedItem.Name, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"groupResource": relatedItem.GroupResource,
|
||||
"namespace": relatedItem.Namespace,
|
||||
"name": relatedItem.Name,
|
||||
}).Warnf("Related item was not found in Kubernetes API, can't add to item block")
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Error(errors.Wrapf(err, "Error while trying to get related item %s %s/%s from cluster", relatedItem.GroupResource.String(), relatedItem.Namespace, relatedItem.Name))
|
||||
continue
|
||||
}
|
||||
itemsMap[relatedItem] = append(itemsMap[relatedItem], &kubernetesResource{
|
||||
groupResource: relatedItem.GroupResource,
|
||||
preferredGVR: gvr,
|
||||
namespace: relatedItem.Namespace,
|
||||
name: relatedItem.Name,
|
||||
inItemBlock: true,
|
||||
})
|
||||
log.Infof("adding %s %s/%s to ItemBlock", relatedItem.GroupResource, relatedItem.Namespace, relatedItem.Name)
|
||||
itemBlock.AddUnstructured(relatedItem.GroupResource, item, gvr)
|
||||
kb.executeItemBlockActions(log, item, relatedItem.GroupResource, relatedItem.Name, relatedItem.Namespace, itemsMap, itemBlock)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
|
||||
// find pods in ItemBlock
|
||||
// filter pods based on whether they still need to be backed up
|
||||
// this list will be used to run pre/post hooks
|
||||
var preHookPods []itemblock.ItemBlockItem
|
||||
itemBlock.Log.Debug("Executing pre hooks")
|
||||
for _, item := range itemBlock.Items {
|
||||
if item.Gr == kuberesource.Pods {
|
||||
metadata, key, err := kb.itemMetadataAndKey(item)
|
||||
if err != nil {
|
||||
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
|
||||
continue
|
||||
}
|
||||
// Don't run hooks if pod is excluded
|
||||
if !itemBlock.itemBackupper.itemInclusionChecks(itemBlock.Log, false, metadata, item.Item, item.Gr) {
|
||||
continue
|
||||
}
|
||||
// Don't run hooks if pod has already been backed up
|
||||
if _, exists := itemBlock.itemBackupper.backupRequest.BackedUpItems[key]; !exists {
|
||||
preHookPods = append(preHookPods, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre)
|
||||
for i, pod := range failedPods {
|
||||
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod")
|
||||
// if pre hook fails, flag pod as backed-up and move on
|
||||
_, key, err := kb.itemMetadataAndKey(pod)
|
||||
if err != nil {
|
||||
itemBlock.Log.WithError(errors.WithStack(err)).Error("Error accessing pod metadata")
|
||||
continue
|
||||
}
|
||||
itemBlock.itemBackupper.backupRequest.BackedUpItems[key] = struct{}{}
|
||||
}
|
||||
|
||||
itemBlock.Log.Debug("Backing up items in BackupItemBlock")
|
||||
var grList []schema.GroupResource
|
||||
for _, item := range itemBlock.Items {
|
||||
if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, &itemBlock); backedUp {
|
||||
grList = append(grList, item.Gr)
|
||||
}
|
||||
}
|
||||
|
||||
itemBlock.Log.Debug("Executing post hooks")
|
||||
_, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost)
|
||||
for i, pod := range failedPods {
|
||||
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
|
||||
}
|
||||
|
||||
return grList
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) itemMetadataAndKey(item itemblock.ItemBlockItem) (metav1.Object, itemKey, error) {
|
||||
metadata, err := meta.Accessor(item.Item)
|
||||
if err != nil {
|
||||
return nil, itemKey{}, err
|
||||
}
|
||||
key := itemKey{
|
||||
resource: resourceKey(item.Item),
|
||||
namespace: metadata.GetNamespace(),
|
||||
name: metadata.GetName(),
|
||||
}
|
||||
return metadata, key, nil
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
|
||||
var successPods []itemblock.ItemBlockItem
|
||||
var failedPods []itemblock.ItemBlockItem
|
||||
var errs []error
|
||||
for _, pod := range hookPods {
|
||||
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker)
|
||||
if err == nil {
|
||||
successPods = append(successPods, pod)
|
||||
} else {
|
||||
failedPods = append(failedPods, pod)
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return successPods, failedPods, errs
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
|
||||
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock)
|
||||
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
|
||||
log.WithField("name", unstructured.GetName()).Infof("%d errors encountered backup up item", len(aggregate.Errors()))
|
||||
// log each error separately so we get error location info in the log, and an
|
||||
|
@ -527,7 +740,7 @@ func (kb *kubernetesBackupper) finalizeItem(
|
|||
unstructured *unstructured.Unstructured,
|
||||
preferredGVR schema.GroupVersionResource,
|
||||
) (bool, []FileForArchive) {
|
||||
backedUpItem, updateFiles, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, true, true)
|
||||
backedUpItem, updateFiles, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, true, true, nil)
|
||||
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
|
||||
log.WithField("name", unstructured.GetName()).Infof("%d errors encountered backup up item", len(aggregate.Errors()))
|
||||
// log each error separately so we get error location info in the log, and an
|
||||
|
@ -581,7 +794,7 @@ func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.Group
|
|||
|
||||
log.Infof("Found associated CRD %s to add to backup", gr.String())
|
||||
|
||||
kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr)
|
||||
kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr, nil)
|
||||
}
|
||||
|
||||
func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
"github.com/vmware-tanzu/velero/pkg/discovery"
|
||||
"github.com/vmware-tanzu/velero/pkg/features"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemblock"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/kuberesource"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
|
||||
|
@ -88,8 +89,8 @@ type FileForArchive struct {
|
|||
// If finalize is true, then it returns the bytes instead of writing them to the tarWriter
|
||||
// In addition to the error return, backupItem also returns a bool indicating whether the item
|
||||
// was actually backed up.
|
||||
func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource, mustInclude, finalize bool) (bool, []FileForArchive, error) {
|
||||
selectedForBackup, files, err := ib.backupItemInternal(logger, obj, groupResource, preferredGVR, mustInclude, finalize)
|
||||
func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource, mustInclude, finalize bool, itemBlock *BackupItemBlock) (bool, []FileForArchive, error) {
|
||||
selectedForBackup, files, err := ib.backupItemInternal(logger, obj, groupResource, preferredGVR, mustInclude, finalize, itemBlock)
|
||||
// return if not selected, an error occurred, there are no files to add, or for finalize
|
||||
if !selectedForBackup || err != nil || len(files) == 0 || finalize {
|
||||
return selectedForBackup, files, err
|
||||
|
@ -106,7 +107,49 @@ func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstr
|
|||
return true, []FileForArchive{}, nil
|
||||
}
|
||||
|
||||
func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource, mustInclude, finalize bool) (bool, []FileForArchive, error) {
|
||||
func (ib *itemBackupper) itemInclusionChecks(log logrus.FieldLogger, mustInclude bool, metadata metav1.Object, obj runtime.Unstructured, groupResource schema.GroupResource) bool {
|
||||
if mustInclude {
|
||||
log.Infof("Skipping the exclusion checks for this resource")
|
||||
} else {
|
||||
if metadata.GetLabels()[velerov1api.ExcludeFromBackupLabel] == "true" {
|
||||
log.Infof("Excluding item because it has label %s=true", velerov1api.ExcludeFromBackupLabel)
|
||||
ib.trackSkippedPV(obj, groupResource, "", fmt.Sprintf("item has label %s=true", velerov1api.ExcludeFromBackupLabel), log)
|
||||
return false
|
||||
}
|
||||
// NOTE: we have to re-check namespace & resource includes/excludes because it's possible that
|
||||
// backupItem can be invoked by a custom action.
|
||||
namespace := metadata.GetNamespace()
|
||||
if namespace != "" && !ib.backupRequest.NamespaceIncludesExcludes.ShouldInclude(namespace) {
|
||||
log.Info("Excluding item because namespace is excluded")
|
||||
return false
|
||||
}
|
||||
|
||||
// NOTE: we specifically allow namespaces to be backed up even if it's excluded.
|
||||
// This check is more permissive for cluster resources to let those passed in by
|
||||
// plugins' additional items to get involved.
|
||||
// Only expel cluster resource when it's specifically listed in the excluded list here.
|
||||
if namespace == "" && groupResource != kuberesource.Namespaces &&
|
||||
ib.backupRequest.ResourceIncludesExcludes.ShouldExclude(groupResource.String()) {
|
||||
log.Info("Excluding item because resource is cluster-scoped and is excluded by cluster filter.")
|
||||
return false
|
||||
}
|
||||
|
||||
// Only check namespace-scoped resource to avoid expelling cluster resources
|
||||
// are not specified in included list.
|
||||
if namespace != "" && !ib.backupRequest.ResourceIncludesExcludes.ShouldInclude(groupResource.String()) {
|
||||
log.Info("Excluding item because resource is excluded")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if metadata.GetDeletionTimestamp() != nil {
|
||||
log.Info("Skipping item because it's being deleted.")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource, mustInclude, finalize bool, itemBlock *BackupItemBlock) (bool, []FileForArchive, error) {
|
||||
var itemFiles []FileForArchive
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
|
@ -122,41 +165,7 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
|
|||
"namespace": namespace,
|
||||
})
|
||||
|
||||
if mustInclude {
|
||||
log.Infof("Skipping the exclusion checks for this resource")
|
||||
} else {
|
||||
if metadata.GetLabels()[velerov1api.ExcludeFromBackupLabel] == "true" {
|
||||
log.Infof("Excluding item because it has label %s=true", velerov1api.ExcludeFromBackupLabel)
|
||||
ib.trackSkippedPV(obj, groupResource, "", fmt.Sprintf("item has label %s=true", velerov1api.ExcludeFromBackupLabel), log)
|
||||
return false, itemFiles, nil
|
||||
}
|
||||
// NOTE: we have to re-check namespace & resource includes/excludes because it's possible that
|
||||
// backupItem can be invoked by a custom action.
|
||||
if namespace != "" && !ib.backupRequest.NamespaceIncludesExcludes.ShouldInclude(namespace) {
|
||||
log.Info("Excluding item because namespace is excluded")
|
||||
return false, itemFiles, nil
|
||||
}
|
||||
|
||||
// NOTE: we specifically allow namespaces to be backed up even if it's excluded.
|
||||
// This check is more permissive for cluster resources to let those passed in by
|
||||
// plugins' additional items to get involved.
|
||||
// Only expel cluster resource when it's specifically listed in the excluded list here.
|
||||
if namespace == "" && groupResource != kuberesource.Namespaces &&
|
||||
ib.backupRequest.ResourceIncludesExcludes.ShouldExclude(groupResource.String()) {
|
||||
log.Info("Excluding item because resource is cluster-scoped and is excluded by cluster filter.")
|
||||
return false, itemFiles, nil
|
||||
}
|
||||
|
||||
// Only check namespace-scoped resource to avoid expelling cluster resources
|
||||
// are not specified in included list.
|
||||
if namespace != "" && !ib.backupRequest.ResourceIncludesExcludes.ShouldInclude(groupResource.String()) {
|
||||
log.Info("Excluding item because resource is excluded")
|
||||
return false, itemFiles, nil
|
||||
}
|
||||
}
|
||||
|
||||
if metadata.GetDeletionTimestamp() != nil {
|
||||
log.Info("Skipping item because it's being deleted.")
|
||||
if !ib.itemInclusionChecks(log, mustInclude, metadata, obj, groupResource) {
|
||||
return false, itemFiles, nil
|
||||
}
|
||||
|
||||
|
@ -180,10 +189,6 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
|
|||
pvbVolumes []string
|
||||
)
|
||||
|
||||
log.Debug("Executing pre hooks")
|
||||
if err := ib.itemHookHandler.HandleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hook.PhasePre, ib.hookTracker); err != nil {
|
||||
return false, itemFiles, err
|
||||
}
|
||||
if optedOut, podName := ib.podVolumeSnapshotTracker.OptedoutByPod(namespace, name); optedOut {
|
||||
ib.trackSkippedPV(obj, groupResource, podVolumeApproach, fmt.Sprintf("opted out due to annotation in pod %s", podName), log)
|
||||
}
|
||||
|
@ -231,15 +236,9 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
|
|||
// the group version of the object.
|
||||
versionPath := resourceVersion(obj)
|
||||
|
||||
updatedObj, additionalItemFiles, err := ib.executeActions(log, obj, groupResource, name, namespace, metadata, finalize)
|
||||
updatedObj, additionalItemFiles, err := ib.executeActions(log, obj, groupResource, name, namespace, metadata, finalize, itemBlock)
|
||||
if err != nil {
|
||||
backupErrs = append(backupErrs, err)
|
||||
|
||||
// if there was an error running actions, execute post hooks and return
|
||||
log.Debug("Executing post hooks")
|
||||
if err := ib.itemHookHandler.HandleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hook.PhasePost, ib.hookTracker); err != nil {
|
||||
backupErrs = append(backupErrs, err)
|
||||
}
|
||||
return false, itemFiles, kubeerrs.NewAggregate(backupErrs)
|
||||
}
|
||||
|
||||
|
@ -294,11 +293,6 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
|
|||
}
|
||||
}
|
||||
|
||||
log.Debug("Executing post hooks")
|
||||
if err := ib.itemHookHandler.HandleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hook.PhasePost, ib.hookTracker); err != nil {
|
||||
backupErrs = append(backupErrs, err)
|
||||
}
|
||||
|
||||
if len(backupErrs) != 0 {
|
||||
return false, itemFiles, kubeerrs.NewAggregate(backupErrs)
|
||||
}
|
||||
|
@ -353,6 +347,7 @@ func (ib *itemBackupper) executeActions(
|
|||
name, namespace string,
|
||||
metadata metav1.Object,
|
||||
finalize bool,
|
||||
itemBlock *BackupItemBlock,
|
||||
) (runtime.Unstructured, []FileForArchive, error) {
|
||||
var itemFiles []FileForArchive
|
||||
for _, action := range ib.backupRequest.ResolvedActions {
|
||||
|
@ -451,35 +446,54 @@ func (ib *itemBackupper) executeActions(
|
|||
}
|
||||
|
||||
for _, additionalItem := range additionalItemIdentifiers {
|
||||
gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion(""))
|
||||
if err != nil {
|
||||
return nil, itemFiles, err
|
||||
var itemList []itemblock.ItemBlockItem
|
||||
|
||||
// get item content from itemBlock if it's there to avoid the additional APIServer call
|
||||
// We could have multiple versions to back up if EnableAPIGroupVersions is set
|
||||
if itemBlock != nil {
|
||||
itemList = itemBlock.FindItem(additionalItem.GroupResource, additionalItem.Namespace, additionalItem.Name)
|
||||
}
|
||||
// if item is not in itemblock, pull it from the cluster
|
||||
if len(itemList) == 0 {
|
||||
log.Infof("Additional Item %s %s/%s not found in ItemBlock, getting from cluster", additionalItem.GroupResource, additionalItem.Namespace, additionalItem.Name)
|
||||
|
||||
gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion(""))
|
||||
if err != nil {
|
||||
return nil, itemFiles, err
|
||||
}
|
||||
|
||||
client, err := ib.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, additionalItem.Namespace)
|
||||
if err != nil {
|
||||
return nil, itemFiles, err
|
||||
}
|
||||
|
||||
item, err := client.Get(additionalItem.Name, metav1.GetOptions{})
|
||||
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"groupResource": additionalItem.GroupResource,
|
||||
"namespace": additionalItem.Namespace,
|
||||
"name": additionalItem.Name,
|
||||
}).Warnf("Additional item was not found in Kubernetes API, can't back it up")
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, itemFiles, errors.WithStack(err)
|
||||
}
|
||||
itemList = append(itemList, itemblock.ItemBlockItem{
|
||||
Gr: additionalItem.GroupResource,
|
||||
Item: item,
|
||||
PreferredGVR: gvr,
|
||||
})
|
||||
}
|
||||
|
||||
client, err := ib.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, additionalItem.Namespace)
|
||||
if err != nil {
|
||||
return nil, itemFiles, err
|
||||
for _, item := range itemList {
|
||||
_, additionalItemFiles, err := ib.backupItem(log, item.Item, additionalItem.GroupResource, item.PreferredGVR, mustInclude, finalize, itemBlock)
|
||||
if err != nil {
|
||||
return nil, itemFiles, err
|
||||
}
|
||||
itemFiles = append(itemFiles, additionalItemFiles...)
|
||||
}
|
||||
|
||||
item, err := client.Get(additionalItem.Name, metav1.GetOptions{})
|
||||
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"groupResource": additionalItem.GroupResource,
|
||||
"namespace": additionalItem.Namespace,
|
||||
"name": additionalItem.Name,
|
||||
}).Warnf("Additional item was not found in Kubernetes API, can't back it up")
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, itemFiles, errors.WithStack(err)
|
||||
}
|
||||
|
||||
_, additionalItemFiles, err := ib.backupItem(log, item, gvr.GroupResource(), gvr, mustInclude, finalize)
|
||||
if err != nil {
|
||||
return nil, itemFiles, err
|
||||
}
|
||||
itemFiles = append(itemFiles, additionalItemFiles...)
|
||||
}
|
||||
}
|
||||
return obj, itemFiles, nil
|
||||
|
|
|
@ -178,6 +178,8 @@ type kubernetesResource struct {
|
|||
groupResource schema.GroupResource
|
||||
preferredGVR schema.GroupVersionResource
|
||||
namespace, name, path string
|
||||
orderedResource bool
|
||||
inItemBlock bool // set to true during backup processing when added to an ItemBlock
|
||||
}
|
||||
|
||||
// getItemsFromResourceIdentifiers get the kubernetesResources
|
||||
|
@ -294,6 +296,7 @@ func sortResourcesByOrder(
|
|||
// First select items from the order
|
||||
for _, name := range order {
|
||||
if item, ok := itemMap[name]; ok {
|
||||
item.orderedResource = true
|
||||
sortedItems = append(sortedItems, item)
|
||||
log.Debugf("%s added to sorted resource list.", item.name)
|
||||
delete(itemMap, name)
|
||||
|
|
|
@ -65,13 +65,15 @@ func TestSortCoreGroup(t *testing.T) {
|
|||
func TestSortOrderedResource(t *testing.T) {
|
||||
log := logrus.StandardLogger()
|
||||
podResources := []*kubernetesResource{
|
||||
{namespace: "ns1", name: "pod3"},
|
||||
{namespace: "ns1", name: "pod1"},
|
||||
{namespace: "ns1", name: "pod2"},
|
||||
}
|
||||
order := []string{"ns1/pod2", "ns1/pod1"}
|
||||
expectedResources := []*kubernetesResource{
|
||||
{namespace: "ns1", name: "pod2"},
|
||||
{namespace: "ns1", name: "pod1"},
|
||||
{namespace: "ns1", name: "pod2", orderedResource: true},
|
||||
{namespace: "ns1", name: "pod1", orderedResource: true},
|
||||
{namespace: "ns1", name: "pod3"},
|
||||
}
|
||||
sortedResources := sortResourcesByOrder(log, podResources, order)
|
||||
assert.Equal(t, expectedResources, sortedResources)
|
||||
|
@ -80,11 +82,13 @@ func TestSortOrderedResource(t *testing.T) {
|
|||
pvResources := []*kubernetesResource{
|
||||
{name: "pv1"},
|
||||
{name: "pv2"},
|
||||
{name: "pv3"},
|
||||
}
|
||||
pvOrder := []string{"pv5", "pv2", "pv1"}
|
||||
expectedPvResources := []*kubernetesResource{
|
||||
{name: "pv2"},
|
||||
{name: "pv1"},
|
||||
{name: "pv2", orderedResource: true},
|
||||
{name: "pv1", orderedResource: true},
|
||||
{name: "pv3"},
|
||||
}
|
||||
sortedPvResources := sortResourcesByOrder(log, pvResources, pvOrder)
|
||||
assert.Equal(t, expectedPvResources, sortedPvResources)
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package backup
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/itemblock"
|
||||
)
|
||||
|
||||
type BackupItemBlock struct {
|
||||
itemblock.ItemBlock
|
||||
// This is a reference to the shared itemBackupper for the backup
|
||||
itemBackupper *itemBackupper
|
||||
}
|
||||
|
||||
func NewBackupItemBlock(log logrus.FieldLogger, itemBackupper *itemBackupper) *BackupItemBlock {
|
||||
return &BackupItemBlock{
|
||||
ItemBlock: itemblock.ItemBlock{Log: log},
|
||||
itemBackupper: itemBackupper,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BackupItemBlock) addKubernetesResource(item *kubernetesResource, log logrus.FieldLogger) *unstructured.Unstructured {
|
||||
// no-op if item is already in a block
|
||||
if item.inItemBlock {
|
||||
return nil
|
||||
}
|
||||
var unstructured unstructured.Unstructured
|
||||
item.inItemBlock = true
|
||||
|
||||
f, err := os.Open(item.path)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error opening file containing item")
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
if err := json.NewDecoder(f).Decode(&unstructured); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("Error decoding JSON from file")
|
||||
return nil
|
||||
}
|
||||
log.Infof("adding %s %s/%s to ItemBlock", item.groupResource, item.namespace, item.name)
|
||||
b.AddUnstructured(item.groupResource, &unstructured, item.preferredGVR)
|
||||
return &unstructured
|
||||
}
|
|
@ -46,6 +46,7 @@ type Request struct {
|
|||
ResourceIncludesExcludes collections.IncludesExcludesInterface
|
||||
ResourceHooks []hook.ResourceHook
|
||||
ResolvedActions []framework.BackupItemResolvedActionV2
|
||||
ResolvedItemBlockActions []framework.ItemBlockResolvedAction
|
||||
VolumeSnapshots []*volume.Snapshot
|
||||
PodVolumeBackups []*velerov1api.PodVolumeBackup
|
||||
BackedUpItems map[itemKey]struct{}
|
||||
|
|
|
@ -618,6 +618,11 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
backupLog.Info("Getting ItemBlock actions")
|
||||
ibActions, err := pluginManager.GetItemBlockActions()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
backupLog.Info("Setting up backup store to check for backup existence")
|
||||
backupStore, err := b.backupStoreGetter.Get(backup.StorageLocation, pluginManager, backupLog)
|
||||
if err != nil {
|
||||
|
@ -635,9 +640,10 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
|
|||
}
|
||||
|
||||
backupItemActionsResolver := framework.NewBackupItemActionResolverV2(actions)
|
||||
itemBlockActionResolver := framework.NewItemBlockActionResolver(ibActions)
|
||||
|
||||
var fatalErrs []error
|
||||
if err := b.backupper.BackupWithResolvers(backupLog, backup, backupFile, backupItemActionsResolver, pluginManager); err != nil {
|
||||
if err := b.backupper.BackupWithResolvers(backupLog, backup, backupFile, backupItemActionsResolver, itemBlockActionResolver, pluginManager); err != nil {
|
||||
fatalErrs = append(fatalErrs, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ import (
|
|||
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
|
||||
pluginmocks "github.com/vmware-tanzu/velero/pkg/plugin/mocks"
|
||||
biav2 "github.com/vmware-tanzu/velero/pkg/plugin/velero/backupitemaction/v2"
|
||||
ibav1 "github.com/vmware-tanzu/velero/pkg/plugin/velero/itemblockaction/v1"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
@ -67,13 +68,15 @@ type fakeBackupper struct {
|
|||
mock.Mock
|
||||
}
|
||||
|
||||
func (b *fakeBackupper) Backup(logger logrus.FieldLogger, backup *pkgbackup.Request, backupFile io.Writer, actions []biav2.BackupItemAction, volumeSnapshotterGetter pkgbackup.VolumeSnapshotterGetter) error {
|
||||
args := b.Called(logger, backup, backupFile, actions, volumeSnapshotterGetter)
|
||||
func (b *fakeBackupper) Backup(logger logrus.FieldLogger, backup *pkgbackup.Request, backupFile io.Writer, actions []biav2.BackupItemAction, itemBlockActions []ibav1.ItemBlockAction, volumeSnapshotterGetter pkgbackup.VolumeSnapshotterGetter) error {
|
||||
args := b.Called(logger, backup, backupFile, actions, itemBlockActions, volumeSnapshotterGetter)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (b *fakeBackupper) BackupWithResolvers(logger logrus.FieldLogger, backup *pkgbackup.Request, backupFile io.Writer,
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2, volumeSnapshotterGetter pkgbackup.VolumeSnapshotterGetter) error {
|
||||
backupItemActionResolver framework.BackupItemActionResolverV2,
|
||||
itemBlockActionResolver framework.ItemBlockActionResolver,
|
||||
volumeSnapshotterGetter pkgbackup.VolumeSnapshotterGetter) error {
|
||||
args := b.Called(logger, backup, backupFile, backupItemActionResolver, volumeSnapshotterGetter)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
@ -1345,6 +1348,7 @@ func TestProcessBackupCompletions(t *testing.T) {
|
|||
}
|
||||
|
||||
pluginManager.On("GetBackupItemActionsV2").Return(nil, nil)
|
||||
pluginManager.On("GetItemBlockActions").Return(nil, nil)
|
||||
pluginManager.On("CleanupClients").Return(nil)
|
||||
backupper.On("Backup", mock.Anything, mock.Anything, mock.Anything, []biav2.BackupItemAction(nil), pluginManager).Return(nil)
|
||||
backupper.On("BackupWithResolvers", mock.Anything, mock.Anything, mock.Anything, framework.BackupItemActionResolverV2{}, pluginManager).Return(nil)
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package itemblock
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
type ItemBlock struct {
|
||||
Log logrus.FieldLogger
|
||||
Items []ItemBlockItem
|
||||
}
|
||||
|
||||
type ItemBlockItem struct {
|
||||
Gr schema.GroupResource
|
||||
Item *unstructured.Unstructured
|
||||
PreferredGVR schema.GroupVersionResource
|
||||
}
|
||||
|
||||
func (ib *ItemBlock) AddUnstructured(gr schema.GroupResource, item *unstructured.Unstructured, preferredGVR schema.GroupVersionResource) {
|
||||
ib.Items = append(ib.Items, ItemBlockItem{
|
||||
Gr: gr,
|
||||
Item: item,
|
||||
PreferredGVR: preferredGVR,
|
||||
})
|
||||
}
|
||||
|
||||
// Could return multiple items if EnableAPIGroupVersions is set. The item matching the preferredGVR is returned first
|
||||
func (ib *ItemBlock) FindItem(gr schema.GroupResource, namespace, name string) []ItemBlockItem {
|
||||
var itemList []ItemBlockItem
|
||||
var returnList []ItemBlockItem
|
||||
|
||||
for _, item := range ib.Items {
|
||||
if item.Gr == gr && item.Item != nil && item.Item.GetName() == name && item.Item.GetNamespace() == namespace {
|
||||
itemGV, err := schema.ParseGroupVersion(item.Item.GetAPIVersion())
|
||||
if err == nil && item.PreferredGVR.GroupVersion() == itemGV {
|
||||
returnList = append(returnList, item)
|
||||
} else {
|
||||
itemList = append(itemList, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
return append(returnList, itemList...)
|
||||
}
|
|
@ -24,9 +24,22 @@ import (
|
|||
|
||||
type MockPodCommandExecutor struct {
|
||||
mock.Mock
|
||||
// hook execution order
|
||||
HookExecutionLog []HookExecutionEntry
|
||||
}
|
||||
|
||||
type HookExecutionEntry struct {
|
||||
Namespace, Name, HookName string
|
||||
HookCommand []string
|
||||
}
|
||||
|
||||
func (e *MockPodCommandExecutor) ExecutePodCommand(log logrus.FieldLogger, item map[string]interface{}, namespace, name, hookName string, hook *v1.ExecHook) error {
|
||||
e.HookExecutionLog = append(e.HookExecutionLog, HookExecutionEntry{
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
HookName: hookName,
|
||||
HookCommand: hook.Command,
|
||||
})
|
||||
args := e.Called(log, item, namespace, name, hookName, hook)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue